This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new f1c8d49  Refactor the network profiling analysis (#55)
f1c8d49 is described below

commit f1c8d493486dff54043fe7a6cd4353593f2ba6bb
Author: mrproliu <[email protected]>
AuthorDate: Thu Oct 13 16:25:39 2022 +0800

    Refactor the network profiling analysis (#55)
---
 pkg/profiling/task/network/analyze/analyze.go      |  33 +
 .../task/network/analyze/base/connection.go        |  93 +++
 pkg/profiling/task/network/analyze/base/context.go | 402 ++++++++++++
 .../task/network/{ => analyze/base}/enums.go       |   2 +-
 pkg/profiling/task/network/analyze/base/events.go  |  70 ++
 .../task/network/analyze/base/listener.go          |  51 ++
 pkg/profiling/task/network/analyze/base/metrics.go |  98 +++
 .../task/network/{ => analyze/base}/tcpresolver.go |   2 +-
 .../{analyzer.go => analyze/base/traffic.go}       | 106 +--
 .../task/network/analyze/layer4/events.go          |  88 +++
 .../task/network/analyze/layer4/listener.go        | 402 ++++++++++++
 .../task/network/analyze/layer4/metrics.go         | 276 ++++++++
 pkg/profiling/task/network/bpf/bpf.go              |  56 ++
 pkg/profiling/task/network/{ => bpf}/linker.go     |   5 +-
 pkg/profiling/task/network/context.go              | 726 ---------------------
 pkg/profiling/task/network/metrics.go              | 396 -----------
 pkg/profiling/task/network/runner.go               | 223 ++++---
 pkg/profiling/task/network/ssl.go                  |  86 +--
 18 files changed, 1799 insertions(+), 1316 deletions(-)

diff --git a/pkg/profiling/task/network/analyze/analyze.go 
b/pkg/profiling/task/network/analyze/analyze.go
new file mode 100644
index 0000000..28cda1e
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/analyze.go
@@ -0,0 +1,33 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package analyze
+
+import (
+       "github.com/apache/skywalking-rover/pkg/process/api"
+       
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+       
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer4"
+)
+
+// NewContext Wrap the analyzer builder
+func NewContext(monitorProcesses map[int32][]api.ProcessInterface) 
*base.AnalyzerContext {
+       context := base.NewAnalyzerContext(monitorProcesses)
+       // register all listeners
+       context.AddListener(layer4.NewListener())
+
+       return context
+}
diff --git a/pkg/profiling/task/network/analyze/base/connection.go 
b/pkg/profiling/task/network/analyze/base/connection.go
new file mode 100644
index 0000000..54f3940
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/connection.go
@@ -0,0 +1,93 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import "github.com/apache/skywalking-rover/pkg/process/api"
+
+type ConnectionContext struct {
+       // basic metadata
+       ConnectionID     uint64
+       RandomID         uint64
+       LocalPid         uint32
+       SocketFD         uint32
+       LocalProcesses   []api.ProcessInterface
+       ConnectionClosed bool
+       Protocol         ConnectionProtocol
+       IsSSL            bool
+
+       // socket metadata
+       Role       ConnectionRole
+       LocalIP    string
+       LocalPort  uint16
+       RemoteIP   string
+       RemotePort uint16
+
+       // metrics
+       Metrics *ConnectionMetrics
+
+       // Flush the data content to the oap count
+       FlushDataCount int
+}
+
+func (c *AnalyzerContext) NewConnectionContext(conID, randomID uint64, pid, fd 
uint32, processes []api.ProcessInterface,
+       conClosed bool) *ConnectionContext {
+       connection := &ConnectionContext{
+               // metadata
+               ConnectionID:     conID,
+               RandomID:         randomID,
+               LocalPid:         pid,
+               SocketFD:         fd,
+               LocalProcesses:   processes,
+               ConnectionClosed: conClosed,
+
+               Metrics: c.NewConnectionMetrics(),
+       }
+       return connection
+}
+
+type ActiveConnectionInBPF struct {
+       RandomID     uint64
+       Pid          uint32
+       SocketFD     uint32
+       Role         ConnectionRole
+       SocketFamily uint32
+
+       RemoteAddrV4   uint32
+       RemoteAddrV6   [16]uint8
+       RemoteAddrPort uint32
+       LocalAddrV4    uint32
+       LocalAddrV6    [16]uint8
+       LocalAddrPort  uint32
+
+       WriteBytes   uint64
+       WriteCount   uint64
+       WriteExeTime uint64
+       ReadBytes    uint64
+       ReadCount    uint64
+       ReadExeTime  uint64
+
+       WriteRTTCount   uint64
+       WriteRTTExeTime uint64
+
+       // Protocol analyze context
+       Protocol ConnectionProtocol
+       IsSSL    uint32
+
+       // the connect event is already sent
+       ConnectEventIsSent uint32
+}
diff --git a/pkg/profiling/task/network/analyze/base/context.go 
b/pkg/profiling/task/network/analyze/base/context.go
new file mode 100644
index 0000000..fcda589
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/context.go
@@ -0,0 +1,402 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+       "context"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "net"
+       "sync"
+       "unsafe"
+
+       "github.com/apache/skywalking-rover/pkg/process/api"
+       "github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
+
+       "github.com/cilium/ebpf"
+
+       "github.com/sirupsen/logrus"
+
+       cmap "github.com/orcaman/concurrent-map"
+
+       "golang.org/x/sys/unix"
+)
+
+type AnalyzerContext struct {
+       // listening process map
+       processes map[int32][]api.ProcessInterface
+
+       // connection handler
+       activeConnections cmap.ConcurrentMap      // current activeConnections 
connections
+       closedConnections []*ConnectionContext    // closed connections'
+       flushClosedEvents chan *SocketCloseEvent  // connection have been 
closed, it is a queue to cache unknown active connections
+       sockParseQueue    chan *ConnectionContext // socket address parse queue
+
+       // analyze listener list
+       listeners []AnalyzeListener
+
+       // close connection modify locker
+       closedConnectionLocker sync.RWMutex
+}
+
+func NewAnalyzerContext(processes map[int32][]api.ProcessInterface) 
*AnalyzerContext {
+       return &AnalyzerContext{
+               processes:         processes,
+               activeConnections: cmap.New(),
+               closedConnections: make([]*ConnectionContext, 0),
+               flushClosedEvents: make(chan *SocketCloseEvent, 5000),
+               sockParseQueue:    make(chan *ConnectionContext, 5000),
+               listeners:         make([]AnalyzeListener, 0),
+       }
+}
+
+func (c *AnalyzerContext) AddListener(l AnalyzeListener) {
+       c.listeners = append(c.listeners, l)
+}
+
+func (c *AnalyzerContext) GetAllConnectionWithContext() []*ConnectionContext {
+       result := make([]*ConnectionContext, 0)
+       result = append(result, c.flushClosedConnection()...)
+       for _, con := range c.activeConnections.Items() {
+               result = append(result, con.(*ConnectionContext))
+       }
+       return result
+}
+
+func (c *AnalyzerContext) RegisterAllHandlers(bpfLoader *bpf.Loader) {
+       // socket connect
+       bpfLoader.ReadEventAsync(bpfLoader.SocketConnectionEventQueue, 
c.handleSocketConnectEvent, func() interface{} {
+               return &SocketConnectEvent{}
+       })
+       // socket close
+       bpfLoader.ReadEventAsync(bpfLoader.SocketCloseEventQueue, 
c.handleSocketCloseEvent, func() interface{} {
+               return &SocketCloseEvent{}
+       })
+       for _, l := range c.listeners {
+               l.RegisterBPFEvents(bpfLoader)
+       }
+}
+
+func (c *AnalyzerContext) RegisterBPFEvents(bpfLoader *bpf.Loader) {
+       for _, l := range c.listeners {
+               l.RegisterBPFEvents(bpfLoader)
+       }
+}
+
+func (c *AnalyzerContext) StartSocketAddressParser(ctx context.Context) {
+       for i := 0; i < 2; i++ {
+               go c.handleSocketParseQueue(ctx)
+       }
+}
+
+func (c *AnalyzerContext) handleSocketParseQueue(ctx context.Context) {
+       for {
+               select {
+               case cc := <-c.sockParseQueue:
+                       socket, err := ParseSocket(cc.LocalPid, cc.SocketFD)
+                       if err != nil {
+                               // if the remote port of connection is empty, 
then this connection not available basically
+                               if cc.RemotePort == 0 {
+                                       log.Warnf("complete the socket error, 
pid: %d, fd: %d, error: %v", cc.LocalPid, cc.SocketFD, err)
+                               }
+                               continue
+                       }
+                       cc.LocalIP = socket.SrcIP
+                       cc.LocalPort = socket.SrcPort
+                       cc.RemoteIP = socket.DestIP
+                       cc.RemotePort = socket.DestPort
+               case <-ctx.Done():
+                       return
+               }
+       }
+}
+
+func (c *AnalyzerContext) handleSocketConnectEvent(data interface{}) {
+       event := data.(*SocketConnectEvent)
+
+       if log.Enable(logrus.DebugLevel) {
+               marshal, _ := json.Marshal(event)
+               log.Debugf("found connect event, json: %s", string(marshal))
+       }
+
+       processes := c.processes[int32(event.Pid)]
+       if len(processes) == 0 {
+               log.Warnf("get process connect event, but this process is don't 
need to monitor, pid: %d", event.Pid)
+               return
+       }
+
+       // build active connection information
+       con := c.NewConnectionContext(event.ConID, event.RandomID, event.Pid, 
event.FD, processes, false)
+       con.Role = event.Role
+       if event.NeedComplete == 0 {
+               con.RemotePort = uint16(event.RemoteAddrPort)
+               con.LocalPort = uint16(event.LocalAddrPort)
+               if event.SocketFamily == unix.AF_INET {
+                       con.LocalIP = parseAddressV4(event.LocalAddrV4)
+                       con.RemoteIP = parseAddressV4(event.RemoteAddrV4)
+               } else {
+                       con.LocalIP = parseAddressV6(event.LocalAddrV6)
+                       con.RemoteIP = parseAddressV6(event.RemoteAddrV6)
+               }
+       } else {
+               // if the remote address exists then setting it
+               if event.RemoteAddrPort != 0 {
+                       con.RemotePort = uint16(event.RemoteAddrPort)
+                       if event.SocketFamily == unix.AF_INET {
+                               con.RemoteIP = 
parseAddressV4(event.RemoteAddrV4)
+                       } else {
+                               con.RemoteIP = 
parseAddressV6(event.RemoteAddrV6)
+                       }
+               }
+               c.sockParseQueue <- con
+       }
+
+       // notify the listeners
+       for _, l := range c.listeners {
+               l.ReceiveNewConnection(con, event)
+       }
+
+       // add to the context
+       c.saveActiveConnection(con)
+}
+
+func (c *AnalyzerContext) handleSocketCloseEvent(data interface{}) {
+       event := data.(*SocketCloseEvent)
+
+       if log.Enable(logrus.DebugLevel) {
+               marshal, _ := json.Marshal(event)
+               log.Debugf("found close event: %s", string(marshal))
+       }
+
+       // try to handle the socket close event
+       if !c.socketClosedEvent0(event) {
+               // is not in active connection, maybe it's not have been added 
to activate first
+               // just add to the close queue, wait for the flush connection 
with interval
+               c.flushClosedEvents <- event
+               return
+       }
+}
+
+func (c *AnalyzerContext) FlushAllMetrics(bpfLoader *bpf.Loader, metricsPrefix 
string) (*MetricsBuilder, error) {
+       metricsBuilder := NewMetricsBuilder(metricsPrefix)
+       err := c.flushMetrics0(bpfLoader, metricsBuilder)
+       if err != nil {
+               return nil, err
+       }
+       return metricsBuilder, nil
+}
+
+func (c *AnalyzerContext) flushMetrics0(bpfLoader *bpf.Loader, builder 
*MetricsBuilder) error {
+       // handling the unfinished close event
+       c.processCachedCloseEvents()
+
+       // get all connections
+       ccs := c.GetAllConnectionWithContext()
+       if len(ccs) == 0 {
+               return nil
+       }
+
+       // prepare to flush metrics
+       err := c.prepareToFlushMetrics(ccs, bpfLoader)
+       if err != nil {
+               return fmt.Errorf("prepare to flush the connection metrics 
failure: %v", err)
+       }
+
+       // combine all connections
+       analyzer := c.NewTrafficAnalyzer()
+       traffics := analyzer.CombineConnectionToTraffics(ccs)
+
+       // generate connections
+       for _, l := range c.listeners {
+               l.FlushMetrics(traffics, builder)
+       }
+
+       // after flush metrics
+       for _, l := range c.listeners {
+               l.PostFlushConnectionMetrics(ccs)
+       }
+       return nil
+}
+
+func (c *AnalyzerContext) prepareToFlushMetrics(ccs []*ConnectionContext, 
bpfLoader *bpf.Loader) error {
+       var active *ActiveConnectionInBPF
+       closedConnections := make([]string, 0)
+       connectionWithBPFList := make([]*ConnectionWithBPF, 0)
+
+       for _, cc := range ccs {
+               active, closedConnections = 
c.lookupTheActiveConnectionInBPf(cc, bpfLoader, closedConnections)
+               connectionWithBPFList = append(connectionWithBPFList, 
&ConnectionWithBPF{
+                       Connection:  cc,
+                       ActiveInBPF: active,
+               })
+       }
+
+       // delete closed connections
+       if len(closedConnections) > 0 {
+               c.deleteConnectionOnly(closedConnections)
+       }
+
+       // call the listeners
+       for _, l := range c.listeners {
+               err := l.PreFlushConnectionMetrics(connectionWithBPFList, 
bpfLoader)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (c *AnalyzerContext) lookupTheActiveConnectionInBPf(connection 
*ConnectionContext, bpfLoader *bpf.Loader,
+       closedConnections []string) (active *ActiveConnectionInBPF, closedRef 
[]string) {
+       var activeConnection ActiveConnectionInBPF
+       // if connection not closed, then load the basic stats from bpf map
+       if !connection.ConnectionClosed {
+               err := 
bpfLoader.ActiveConnectionMap.Lookup(connection.ConnectionID, &activeConnection)
+               if err != nil {
+                       if errors.Is(err, ebpf.ErrKeyNotExist) {
+                               closedConnections = append(closedConnections, 
c.generateConnectionKey(connection.ConnectionID, connection.RandomID))
+                               connection.ConnectionClosed = true
+                       } else {
+                               log.Warnf("lookup the active connection error, 
connection id: %d, error: %v", connection.ConnectionID, err)
+                       }
+                       return nil, closedConnections
+               }
+
+               if log.Enable(logrus.DebugLevel) {
+                       marshal, _ := json.Marshal(activeConnection)
+                       log.Debugf("found the active connection, conid: %d, 
data: %s", connection.ConnectionID, string(marshal))
+               }
+
+               if connection.Role == ConnectionRoleUnknown && 
activeConnection.Role != ConnectionRoleUnknown {
+                       connection.Role = activeConnection.Role
+               }
+               if connection.Protocol == ConnectionProtocolUnknown && 
activeConnection.Protocol != ConnectionProtocolUnknown {
+                       connection.Protocol = activeConnection.Protocol
+               }
+               if !connection.IsSSL && activeConnection.IsSSL == 1 {
+                       connection.IsSSL = true
+               }
+               return &activeConnection, closedConnections
+       }
+       return nil, closedConnections
+}
+
+func (c *AnalyzerContext) deleteConnectionOnly(ccs []string) {
+       for _, cc := range ccs {
+               c.activeConnections.Remove(cc)
+       }
+}
+
+func (c *AnalyzerContext) processCachedCloseEvents() {
+       for len(c.flushClosedEvents) > 0 {
+               event := <-c.flushClosedEvents
+               if !c.socketClosedEvent0(event) {
+                       // if cannot the found the active connection, then just 
create a new closed connection context
+                       processes := c.processes[int32(event.Pid)]
+                       if len(processes) == 0 {
+                               continue
+                       }
+                       cc := c.NewConnectionContext(event.ConID, 
event.RandomID, event.Pid, event.SocketFD, processes, true)
+                       if event.SocketFamily == unix.AF_INET {
+                               cc.RemoteIP = parseAddressV4(event.RemoteAddrV4)
+                               cc.LocalIP = parseAddressV4(event.LocalAddrV4)
+                       } else if event.SocketFamily == unix.AF_INET6 {
+                               cc.RemoteIP = parseAddressV6(event.RemoteAddrV6)
+                               cc.LocalIP = parseAddressV6(event.LocalAddrV6)
+                       } else {
+                               continue
+                       }
+
+                       // append to the closed connection
+                       c.appendClosedConnection(c.combineClosedConnection(cc, 
event))
+               }
+       }
+}
+
+func (c *AnalyzerContext) generateConnectionKey(conID, randomID uint64) string 
{
+       return fmt.Sprintf("%d_%d", conID, randomID)
+}
+
+func (c *AnalyzerContext) socketClosedEvent0(event *SocketCloseEvent) bool {
+       activeCon := c.foundAndDeleteConnection(event)
+       if activeCon == nil {
+               return false
+       }
+
+       // combine the connection data
+       c.appendClosedConnection(c.combineClosedConnection(activeCon, event))
+       return true
+}
+
+func (c *AnalyzerContext) foundAndDeleteConnection(event *SocketCloseEvent) 
*ConnectionContext {
+       conKey := c.generateConnectionKey(event.ConID, event.RandomID)
+       val, exists := c.activeConnections.Pop(conKey)
+       if !exists {
+               return nil
+       }
+       return val.(*ConnectionContext)
+}
+
+func (c *AnalyzerContext) combineClosedConnection(active *ConnectionContext, 
closed *SocketCloseEvent) *ConnectionContext {
+       active.ConnectionClosed = true
+
+       if active.Role == ConnectionRoleUnknown && closed.Role != 
ConnectionRoleUnknown {
+               active.Role = closed.Role
+       }
+       if active.Protocol == ConnectionProtocolUnknown && closed.Protocol != 
ConnectionProtocolUnknown {
+               active.Protocol = closed.Protocol
+       }
+       if !active.IsSSL && closed.IsSSL == 1 {
+               active.IsSSL = true
+       }
+
+       // notify listeners
+       for _, l := range c.listeners {
+               l.ReceiveCloseConnection(active, closed)
+       }
+       return active
+}
+
+func (c *AnalyzerContext) saveActiveConnection(con *ConnectionContext) {
+       c.activeConnections.Set(c.generateConnectionKey(con.ConnectionID, 
con.RandomID), con)
+}
+
+func (c *AnalyzerContext) flushClosedConnection() []*ConnectionContext {
+       c.closedConnectionLocker.Lock()
+       defer c.closedConnectionLocker.Unlock()
+
+       connections := c.closedConnections
+       c.closedConnections = make([]*ConnectionContext, 0)
+       return connections
+}
+
+func (c *AnalyzerContext) appendClosedConnection(con *ConnectionContext) {
+       c.closedConnectionLocker.RLock()
+       defer c.closedConnectionLocker.RUnlock()
+
+       c.closedConnections = append(c.closedConnections, con)
+}
+
+func parseAddressV4(val uint32) string {
+       return net.IP((*(*[net.IPv4len]byte)(unsafe.Pointer(&val)))[:]).String()
+}
+
+func parseAddressV6(val [16]uint8) string {
+       return net.IP((*(*[net.IPv6len]byte)(unsafe.Pointer(&val)))[:]).String()
+}
diff --git a/pkg/profiling/task/network/enums.go 
b/pkg/profiling/task/network/analyze/base/enums.go
similarity index 99%
rename from pkg/profiling/task/network/enums.go
rename to pkg/profiling/task/network/analyze/base/enums.go
index db43f06..d1f37e7 100644
--- a/pkg/profiling/task/network/enums.go
+++ b/pkg/profiling/task/network/analyze/base/enums.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package network
+package base
 
 const (
        unknown = "unknown"
diff --git a/pkg/profiling/task/network/analyze/base/events.go 
b/pkg/profiling/task/network/analyze/base/events.go
new file mode 100644
index 0000000..6fbf4b4
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/events.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+// SocketConnectEvent Socket have been connection/accept event
+type SocketConnectEvent struct {
+       ConID        uint64
+       RandomID     uint64
+       ExeTime      uint64
+       NeedComplete uint32
+       Pid          uint32
+       FD           uint32
+       FuncName     uint32
+
+       // socket information if exists
+       Role           ConnectionRole
+       SocketFamily   uint32
+       RemoteAddrV4   uint32
+       RemoteAddrV6   [16]uint8
+       RemoteAddrPort uint32
+       LocalAddrV4    uint32
+       LocalAddrV6    [16]uint8
+       LocalAddrPort  uint32
+}
+
+type SocketCloseEvent struct {
+       ConID    uint64
+       RandomID uint64
+       ExeTime  uint64
+       Pid      uint32
+       SocketFD uint32
+       Role     ConnectionRole
+       Protocol ConnectionProtocol
+       IsSSL    uint32
+       Fix      uint32
+
+       SocketFamily   uint32
+       RemoteAddrV4   uint32
+       RemoteAddrV6   [16]uint8
+       RemoteAddrPort uint32
+       LocalAddrV4    uint32
+       LocalAddrV6    [16]uint8
+       LocalAddrPort  uint32
+       Fix1           uint32
+
+       WriteBytes   uint64
+       WriteCount   uint64
+       WriteExeTime uint64
+       ReadBytes    uint64
+       ReadCount    uint64
+       ReadExeTime  uint64
+
+       WriteRTTCount   uint64
+       WriteRTTExeTime uint64
+}
diff --git a/pkg/profiling/task/network/analyze/base/listener.go 
b/pkg/profiling/task/network/analyze/base/listener.go
new file mode 100644
index 0000000..1e9e7d2
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/listener.go
@@ -0,0 +1,51 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+       "github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
+)
+
+type AnalyzeListener interface {
+       // Name of the listener
+       Name() string
+       // GenerateMetrics generate a metrics context
+       // It would bind to a ConnectionContext or ProcessTraffic automatically
+       GenerateMetrics() ListenerMetrics
+
+       // RegisterBPFEvents register the BPF events
+       RegisterBPFEvents(bpfLoader *bpf.Loader)
+
+       // ReceiveNewConnection call this method when receive a new connection 
event
+       // when return a metrics then It would bind to with the connection
+       ReceiveNewConnection(ctx *ConnectionContext, event *SocketConnectEvent)
+       // ReceiveCloseConnection call this method when receive the connection 
close event
+       ReceiveCloseConnection(ctx *ConnectionContext, event *SocketCloseEvent)
+
+       // PreFlushConnectionMetrics prepare to flush the connection metrics
+       PreFlushConnectionMetrics(ccs []*ConnectionWithBPF, bpfLoader 
*bpf.Loader) error
+       // FlushMetrics flush all metrics from connections
+       FlushMetrics(traffics []*ProcessTraffic, builder *MetricsBuilder)
+       // PostFlushConnectionMetrics after flushing all metrics, usually used 
to refresh the metrics
+       PostFlushConnectionMetrics(ccs []*ConnectionContext)
+}
+
+type ConnectionWithBPF struct {
+       Connection  *ConnectionContext
+       ActiveInBPF *ActiveConnectionInBPF
+}
diff --git a/pkg/profiling/task/network/analyze/base/metrics.go 
b/pkg/profiling/task/network/analyze/base/metrics.go
new file mode 100644
index 0000000..2c1eddf
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/metrics.go
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+       "time"
+
+       v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+)
+
+// ListenerMetrics The Metrics in each listener
+type ListenerMetrics interface {
+       // FlushMetrics Flush the metrics of connection, and merge into self
+       FlushMetrics(connection *ConnectionContext)
+}
+
+type ConnectionMetrics struct {
+       data map[string]ListenerMetrics
+}
+
+func (c *AnalyzerContext) NewConnectionMetrics() *ConnectionMetrics {
+       data := make(map[string]ListenerMetrics)
+       for _, l := range c.listeners {
+               data[l.Name()] = l.GenerateMetrics()
+       }
+       return &ConnectionMetrics{data: data}
+}
+
+func (c *ConnectionMetrics) GetMetrics(listenerName string) ListenerMetrics {
+       return c.data[listenerName]
+}
+
+func (c *ConnectionMetrics) FlushMetrics(connection *ConnectionContext) {
+       for _, metric := range c.data {
+               metric.FlushMetrics(connection)
+       }
+}
+
+type MetricsBuilder struct {
+       prefix  string
+       metrics map[metadata][]*v3.MeterData
+}
+
+func NewMetricsBuilder(prefix string) *MetricsBuilder {
+       return &MetricsBuilder{
+               prefix:  prefix,
+               metrics: make(map[metadata][]*v3.MeterData),
+       }
+}
+
+func (m *MetricsBuilder) AppendMetrics(service, instance string, metrics 
[]*v3.MeterData) {
+       meta := metadata{ServiceName: service, InstanceName: instance}
+       existingMetrics := m.metrics[meta]
+       if len(existingMetrics) == 0 {
+               m.metrics[meta] = metrics
+               return
+       }
+       m.metrics[meta] = append(existingMetrics, metrics...)
+}
+
+func (m *MetricsBuilder) MetricPrefix() string {
+       return m.prefix
+}
+
+func (m *MetricsBuilder) Build() []*v3.MeterDataCollection {
+       collections := make([]*v3.MeterDataCollection, 0)
+       now := time.Now().UnixMilli()
+       for meta, meters := range m.metrics {
+               if len(meters) == 0 {
+                       continue
+               }
+               meters[0].Service = meta.ServiceName
+               meters[0].ServiceInstance = meta.InstanceName
+               meters[0].Timestamp = now
+               collections = append(collections, 
&v3.MeterDataCollection{MeterData: meters})
+       }
+       return collections
+}
+
+type metadata struct {
+       ServiceName  string
+       InstanceName string
+}
diff --git a/pkg/profiling/task/network/tcpresolver.go 
b/pkg/profiling/task/network/analyze/base/tcpresolver.go
similarity index 99%
rename from pkg/profiling/task/network/tcpresolver.go
rename to pkg/profiling/task/network/analyze/base/tcpresolver.go
index 92d2326..dc27cd8 100644
--- a/pkg/profiling/task/network/tcpresolver.go
+++ b/pkg/profiling/task/network/analyze/base/tcpresolver.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package network
+package base
 
 import (
        "encoding/binary"
diff --git a/pkg/profiling/task/network/analyzer.go 
b/pkg/profiling/task/network/analyze/base/traffic.go
similarity index 86%
rename from pkg/profiling/task/network/analyzer.go
rename to pkg/profiling/task/network/analyze/base/traffic.go
index 8eb69bc..99a88a2 100644
--- a/pkg/profiling/task/network/analyzer.go
+++ b/pkg/profiling/task/network/analyze/base/traffic.go
@@ -15,21 +15,50 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package network
+package base
 
 import (
+       "fmt"
+
+       "github.com/apache/skywalking-rover/pkg/logger"
        "github.com/apache/skywalking-rover/pkg/process/api"
        "github.com/apache/skywalking-rover/pkg/tools"
 )
 
+var log = logger.GetLogger("profiling", "task", "network", "analyze")
+
 const (
        layerMeshDP  = "MESH_DP"
        layerMeshApp = "MESH"
        processEnvoy = "envoy"
 )
 
+type ProcessTraffic struct {
+       Analyzer *TrafficAnalyzer
+
+       // local process information
+       LocalIP        string
+       LocalPort      uint16
+       LocalPid       uint32
+       LocalProcesses []api.ProcessInterface
+
+       // remote process/address information
+       RemoteIP        string
+       RemotePort      uint16
+       RemotePid       uint32
+       RemoteProcesses []api.ProcessInterface
+
+       // connection basic information
+       Role     ConnectionRole
+       Protocol ConnectionProtocol
+       IsSSL    bool
+
+       // metrics
+       Metrics *ConnectionMetrics
+}
+
 type TrafficAnalyzer struct {
-       existingProcesses map[int32][]api.ProcessInterface
+       analyzeContext *AnalyzerContext
        // used to find local same with remote address
        // the connect request(local:a -> remote:b) same with accept 
address(remote:a -> local:b)
        // key: localIP:port+RemoteIP+port
@@ -59,9 +88,9 @@ type TrafficAnalyzer struct {
        localAddresses map[string]map[string]api.ProcessInterface
 }
 
-func NewTrafficAnalyzer(processes map[int32][]api.ProcessInterface) 
*TrafficAnalyzer {
+func (c *AnalyzerContext) NewTrafficAnalyzer() *TrafficAnalyzer {
        return &TrafficAnalyzer{
-               existingProcesses:             processes,
+               analyzeContext:                c,
                localWithPeerCache:            
make(map[LocalWithPeerAddress]*PidWithRole),
                peerAddressCache:              make(map[PeerAddress][]uint32),
                envoyAcceptClientAddressCache: 
make(map[PeerAddress]*AddressWithPid),
@@ -125,19 +154,33 @@ func (t *TrafficAnalyzer) 
CombineConnectionToTraffics(connections []*ConnectionC
        // combine all result
        result := make([]*ProcessTraffic, 0)
        for _, v := range pidMatchedTraffic {
-               if v.ContainsAnyTraffic() {
-                       result = append(result, v)
-               }
+               result = append(result, v)
        }
        for _, v := range pidToRemoteTraffic {
-               if v.ContainsAnyTraffic() {
-                       result = append(result, v)
-               }
+               result = append(result, v)
        }
 
        return result
 }
 
+func (t *ProcessTraffic) GenerateConnectionInfo() string {
+       localInfo := fmt.Sprintf("%s:%d(%d)", t.LocalIP, t.LocalPort, 
t.LocalPid)
+       if len(t.LocalProcesses) > 0 {
+               localInfo = t.generateProcessInfo(t.LocalProcesses[0])
+       }
+
+       remoteInfo := fmt.Sprintf("%s:%d(%d)", t.RemoteIP, t.RemotePort, 
t.RemotePid)
+       if len(t.RemoteProcesses) > 0 {
+               remoteInfo = t.generateProcessInfo(t.RemoteProcesses[0])
+       }
+       return fmt.Sprintf("%s -> %s", localInfo, remoteInfo)
+}
+
+func (t *ProcessTraffic) generateProcessInfo(p api.ProcessInterface) string {
+       return fmt.Sprintf("(%s)%s:%s:%s(%s:%d)(%d)", p.Entity().Layer, 
p.Entity().ServiceName,
+               p.Entity().InstanceName, p.Entity().ProcessName, t.LocalIP, 
t.LocalPort, t.LocalPid)
+}
+
 func (t *TrafficAnalyzer) tryingToGenerateTheRoleWhenRemotePidCannotFound(con 
*ConnectionContext) {
        if con.Role != ConnectionRoleUnknown {
                return
@@ -163,34 +206,21 @@ func (t *TrafficAnalyzer) 
tryingToGenerateTheRoleWhenRemotePidCannotFound(con *C
 func (t *TrafficAnalyzer) generateOrCombineTraffic(traffic *ProcessTraffic, 
con *ConnectionContext, remotePid uint32) *ProcessTraffic {
        if traffic == nil {
                traffic = &ProcessTraffic{
-                       analyzer: t,
+                       Analyzer: t,
 
                        LocalPid:       con.LocalPid,
                        LocalProcesses: con.LocalProcesses,
                        LocalIP:        con.LocalIP,
                        LocalPort:      con.LocalPort,
 
-                       ConnectionRole: con.Role,
-
-                       WriteCounter:            NewSocketDataCounter(),
-                       ReadCounter:             NewSocketDataCounter(),
-                       WriteRTTCounter:         NewSocketDataCounter(),
-                       ConnectCounter:          NewSocketDataCounter(),
-                       CloseCounter:            NewSocketDataCounter(),
-                       RetransmitCounter:       NewSocketDataCounter(),
-                       DropCounter:             NewSocketDataCounter(),
-                       WriteRTTHistogram:       
NewSocketDataHistogram(HistogramDataUnitUS),
-                       WriteExeTimeHistogram:   
NewSocketDataHistogram(HistogramDataUnitNS),
-                       ReadExeTimeHistogram:    
NewSocketDataHistogram(HistogramDataUnitNS),
-                       ConnectExeTimeHistogram: 
NewSocketDataHistogram(HistogramDataUnitNS),
-                       CloseExeTimeHistogram:   
NewSocketDataHistogram(HistogramDataUnitNS),
+                       Metrics: t.analyzeContext.NewConnectionMetrics(),
                }
        }
        if len(traffic.LocalProcesses) == 0 && len(con.LocalProcesses) > 0 {
                traffic.LocalProcesses = con.LocalProcesses
        }
-       if traffic.ConnectionRole == ConnectionRoleUnknown && con.Role != 
ConnectionRoleUnknown {
-               traffic.ConnectionRole = con.Role
+       if traffic.Role == ConnectionRoleUnknown && con.Role != 
ConnectionRoleUnknown {
+               traffic.Role = con.Role
        }
        if traffic.Protocol == ConnectionProtocolUnknown && con.Protocol != 
ConnectionProtocolUnknown {
                traffic.Protocol = con.Protocol
@@ -206,23 +236,9 @@ func (t *TrafficAnalyzer) generateOrCombineTraffic(traffic 
*ProcessTraffic, con
        traffic.RemoteIP = con.RemoteIP
        traffic.RemotePort = con.RemotePort
 
-       traffic.WriteCounter.Increase(con.WriteCounter.CalculateIncrease())
-       traffic.ReadCounter.Increase(con.ReadCounter.CalculateIncrease())
-       
traffic.WriteRTTCounter.Increase(con.WriteRTTCounter.CalculateIncrease())
-       traffic.RetransmitCounter.Increase(con.RetransmitCounter)
-       traffic.DropCounter.Increase(con.DropCounter)
-       
traffic.WriteRTTHistogram.Increase(con.WriteRTTHistogram.CalculateIncrease())
-       
traffic.WriteExeTimeHistogram.Increase(con.WriteExeTimeHistogram.CalculateIncrease())
-       
traffic.ReadExeTimeHistogram.Increase(con.ReadExeTimeHistogram.CalculateIncrease())
-
-       if con.FlushDataCount == 0 && con.ConnectExecuteTime > 0 {
-               traffic.ConnectCounter.IncreaseByValue(0, 1, 
con.ConnectExecuteTime)
-               
traffic.ConnectExeTimeHistogram.IncreaseByValue(con.ConnectExecuteTime)
-       }
-       if con.FlushDataCount == 0 && con.CloseExecuteTime > 0 {
-               traffic.CloseCounter.IncreaseByValue(0, 1, con.CloseExecuteTime)
-               
traffic.CloseExeTimeHistogram.IncreaseByValue(con.CloseExecuteTime)
-       }
+       // flush connection metrics
+       traffic.Metrics.FlushMetrics(con)
+
        con.FlushDataCount++
        return traffic
 }
@@ -414,7 +430,7 @@ func (t *TrafficAnalyzer) 
findRemotePidWhenMeshEnvironment(con *ConnectionContex
 }
 
 func (t *TrafficAnalyzer) findSameInstanceMeshDP(entity *api.ProcessEntity) 
uint32 {
-       for _, psList := range t.existingProcesses {
+       for _, psList := range t.analyzeContext.processes {
                for _, p := range psList {
                        if p.Entity().Layer == layerMeshDP && 
p.Entity().ServiceName == entity.ServiceName && p.Entity().InstanceName == 
entity.InstanceName {
                                name, err := p.ExeName()
diff --git a/pkg/profiling/task/network/analyze/layer4/events.go 
b/pkg/profiling/task/network/analyze/layer4/events.go
new file mode 100644
index 0000000..641c2fb
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer4/events.go
@@ -0,0 +1,88 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package layer4
+
+import (
+       "encoding/json"
+
+       "github.com/apache/skywalking-rover/pkg/logger"
+       
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+
+       "github.com/sirupsen/logrus"
+)
+
+var log = logger.GetLogger("profiling", "task", "network", "layer4")
+
+// SocketExceptionOperationEvent Socket have been retransmitted/drop the 
package event
+type SocketExceptionOperationEvent struct {
+       Pid            uint32
+       SocketFamily   uint32
+       RemoteAddrV4   uint32
+       RemoteAddrV6   [16]uint8
+       RemoteAddrPort uint32
+       Type           base.SocketExceptionOperationType
+}
+
+func (l *Listener) handleSocketExceptionOperationEvent(data interface{}) {
+       event := data.(*SocketExceptionOperationEvent)
+       l.socketExceptionOperationLock.Lock()
+       defer l.socketExceptionOperationLock.Unlock()
+
+       key := SocketBasicKey{
+               Pid:          event.Pid,
+               Family:       event.SocketFamily,
+               RemoteAddrV4: event.RemoteAddrV4,
+               RemoteAddrV6: event.RemoteAddrV6,
+               RemotePort:   event.RemoteAddrPort,
+       }
+       exceptionValue := l.socketExceptionStatics[key]
+       if exceptionValue == nil {
+               exceptionValue = &SocketExceptionValue{}
+               l.socketExceptionStatics[key] = exceptionValue
+       }
+
+       switch event.Type {
+       case base.SocketExceptionOperationRetransmit:
+               exceptionValue.RetransmitCount++
+       case base.SocketExceptionOperationDrop:
+               exceptionValue.DropCount++
+       default:
+               log.Warnf("unknown socket exception operation type: %d", 
event.Type)
+       }
+
+       if log.Enable(logrus.DebugLevel) {
+               marshal, _ := json.Marshal(event)
+               log.Debugf("found socket exception operation event: %s", 
string(marshal))
+       }
+}
+
+type SocketBasicKey struct {
+       Pid          uint32
+       Family       uint32
+       RemoteAddrV4 uint32
+       RemoteAddrV6 [16]uint8
+       RemotePort   uint32
+       LocalAddrV4  uint32
+       LocalAddrV6  [16]uint8
+       LocalPort    uint32
+}
+
+type SocketExceptionValue struct {
+       DropCount       int
+       RetransmitCount int
+}
diff --git a/pkg/profiling/task/network/analyze/layer4/listener.go 
b/pkg/profiling/task/network/analyze/layer4/listener.go
new file mode 100644
index 0000000..88db514
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer4/listener.go
@@ -0,0 +1,402 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package layer4
+
+import (
+       "fmt"
+       "net"
+       "sync"
+       "unsafe"
+
+       "github.com/sirupsen/logrus"
+
+       "github.com/apache/skywalking-rover/pkg/process/api"
+       
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+       "github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
+       "github.com/apache/skywalking-rover/pkg/tools"
+
+       v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+       "golang.org/x/sys/unix"
+)
+
+var Name = "layer4"
+
+type Listener struct {
+       // socket retransmit/drop
+       socketExceptionStatics       map[SocketBasicKey]*SocketExceptionValue
+       socketExceptionOperationLock sync.Mutex
+}
+
+func NewListener() *Listener {
+       return &Listener{
+               socketExceptionStatics: 
make(map[SocketBasicKey]*SocketExceptionValue),
+       }
+}
+
+func (l *Listener) Name() string {
+       return Name
+}
+
+func (l *Listener) GenerateMetrics() base.ListenerMetrics {
+       return NewLayer4Metrics()
+}
+
+func (l *Listener) RegisterBPFEvents(bpfLoader *bpf.Loader) {
+       bpfLoader.ReadEventAsync(bpfLoader.SocketExceptionOperationEventQueue, 
l.handleSocketExceptionOperationEvent, func() interface{} {
+               return &SocketExceptionOperationEvent{}
+       })
+}
+
+func (l *Listener) ReceiveNewConnection(ctx *base.ConnectionContext, event 
*base.SocketConnectEvent) {
+       // update the connection execute time
+       l.getMetrics(ctx.Metrics).ConnectExecuteTime = event.ExeTime
+}
+
+func (l *Listener) ReceiveCloseConnection(ctx *base.ConnectionContext, event 
*base.SocketCloseEvent) {
+       layer4 := l.getMetrics(ctx.Metrics)
+       // data transmit counters
+       layer4.WriteCounter.UpdateToCurrent(event.WriteBytes, event.WriteCount, 
event.WriteExeTime)
+       layer4.ReadCounter.UpdateToCurrent(event.ReadBytes, event.ReadCount, 
event.ReadExeTime)
+       layer4.WriteRTTCounter.UpdateToCurrent(0, event.WriteRTTCount, 
event.WriteRTTExeTime)
+
+       // connection close execute time
+       layer4.CloseExecuteTime = event.ExeTime
+}
+
+func (l *Listener) PreFlushConnectionMetrics(ccs []*base.ConnectionWithBPF, 
bpfLoader *bpf.Loader) error {
+       // rebuild to the map for helping quick search correlate 
ConnectionContext
+       keyWithContext := make(map[string]*base.ConnectionContext)
+       for _, cc := range ccs {
+               // ready to flush histograms
+               connection := cc.Connection
+               layer4 := l.getMetrics(connection.Metrics)
+               // basic counter update
+               activeConnection := cc.ActiveInBPF
+               if activeConnection != nil {
+                       
layer4.WriteCounter.UpdateToCurrent(activeConnection.WriteBytes, 
activeConnection.WriteCount, activeConnection.WriteExeTime)
+                       
layer4.ReadCounter.UpdateToCurrent(activeConnection.ReadBytes, 
activeConnection.ReadCount, activeConnection.ReadExeTime)
+                       layer4.WriteRTTCounter.UpdateToCurrent(0, 
activeConnection.WriteRTTCount, activeConnection.WriteRTTExeTime)
+               }
+               // build cache
+               keyWithContext[l.generateConID(connection.ConnectionID, 
connection.RandomID)] = connection
+
+               if log.Enable(logrus.DebugLevel) {
+                       log.Debugf("found connection: %d, %s relation: 
%s:%d(%d) -> %s:%d, protocol: %s, is_ssl: %t, is_closed: %t, write: %d 
bytes/%d, read: %d bytes/%d",
+                               connection.ConnectionID, 
connection.Role.String(),
+                               connection.LocalIP, connection.LocalPort, 
connection.LocalPid, connection.RemoteIP, connection.RemotePort,
+                               connection.Protocol.String(), connection.IsSSL, 
connection.ConnectionClosed, layer4.WriteCounter.Cur.Bytes,
+                               layer4.WriteCounter.Cur.Count, 
layer4.ReadCounter.Cur.Bytes, layer4.ReadCounter.Cur.Count)
+               }
+       }
+
+       var key HistogramDataKey
+       var count uint32
+       histogramIt := bpfLoader.SocketConnectionStatsHistogram.Iterate()
+       // for-each the stats map
+       for histogramIt.Next(&key, &count) {
+               // if it's not relate to the ConnectionContext just ignore
+               cc := keyWithContext[l.generateConID(key.ConnectionID, 
key.RandomID)]
+               if cc == nil {
+                       continue
+               }
+               layer4 := l.getMetrics(cc.Metrics)
+
+               // add the histogram data
+               var histogram *SocketDataHistogramWithHistory
+               if key.DataDirection == base.SocketDataDirectionEgress {
+                       if key.DataType == base.SocketDataStaticsTypeExeTime {
+                               histogram = layer4.WriteExeTimeHistogram
+                       } else if key.DataType == base.SocketDataStaticsTypeRTT 
{
+                               histogram = layer4.WriteRTTHistogram
+                       }
+               } else if key.DataDirection == base.SocketDataDirectionIngress {
+                       histogram = layer4.ReadExeTimeHistogram
+               }
+               if histogram == nil {
+                       log.Warnf("unknown the histogram data: %v", cc)
+                       continue
+               }
+               histogram.UpdateToCurrent(key.Bucket, count)
+
+               // delete the stats if the connection already closed
+               if cc.ConnectionClosed {
+                       if err := 
bpfLoader.SocketConnectionStatsHistogram.Delete(key); err != nil {
+                               log.Warnf("delete the connection stats failure: 
%v", err)
+                       }
+               }
+       }
+
+       // all the exception operations to the context
+       exceptionContexts := l.cleanAndGetAllExceptionContexts()
+       l.combineExceptionToConnections(keyWithContext, exceptionContexts)
+       return nil
+}
+
+func (l *Listener) PostFlushConnectionMetrics(ccs []*base.ConnectionContext) {
+       for _, connection := range ccs {
+               metrics := l.getMetrics(connection.Metrics)
+
+               // refresh counters
+               metrics.WriteCounter.RefreshCurrent()
+               metrics.ReadCounter.RefreshCurrent()
+               metrics.WriteRTTCounter.RefreshCurrent()
+               metrics.WriteRTTHistogram.RefreshCurrent()
+               metrics.WriteExeTimeHistogram.RefreshCurrent()
+               metrics.ReadExeTimeHistogram.RefreshCurrent()
+               metrics.ConnectCounter.RefreshCurrent()
+               metrics.CloseCounter.RefreshCurrent()
+               metrics.ConnectExeTimeHistogram.RefreshCurrent()
+               metrics.CloseExeTimeHistogram.RefreshCurrent()
+               metrics.RetransmitCounter.RefreshCurrent()
+               metrics.DropCounter.RefreshCurrent()
+       }
+}
+
+func (l *Listener) FlushMetrics(traffics []*base.ProcessTraffic, builder 
*base.MetricsBuilder) {
+       l.logTheMetricsConnections(traffics)
+
+       metricsPrefix := builder.MetricPrefix()
+       for _, traffic := range traffics {
+               metrics := traffic.Metrics.GetMetrics(Name).(*Metrics)
+               for _, p := range traffic.LocalProcesses {
+                       collection := make([]*v3.MeterData, 0)
+                       collection = l.appendCounterValues(collection, 
metricsPrefix, "write", traffic, p, metrics.WriteCounter)
+                       collection = l.appendCounterValues(collection, 
metricsPrefix, "read", traffic, p, metrics.ReadCounter)
+                       collection = l.appendCounterValues(collection, 
metricsPrefix, "write_rtt", traffic, p, metrics.WriteRTTCounter)
+                       collection = l.appendCounterValues(collection, 
metricsPrefix, "connect", traffic, p, metrics.ConnectCounter)
+                       collection = l.appendCounterValues(collection, 
metricsPrefix, "close", traffic, p, metrics.CloseCounter)
+                       collection = l.appendCounterValues(collection, 
metricsPrefix, "retransmit", traffic, p, metrics.RetransmitCounter)
+                       collection = l.appendCounterValues(collection, 
metricsPrefix, "drop", traffic, p, metrics.DropCounter)
+
+                       collection = l.appendHistogramValue(collection, 
metricsPrefix, "write_rtt", traffic, p, metrics.WriteRTTHistogram)
+                       collection = l.appendHistogramValue(collection, 
metricsPrefix, "write_exe_time", traffic, p, metrics.WriteExeTimeHistogram)
+                       collection = l.appendHistogramValue(collection, 
metricsPrefix, "read_exe_time", traffic, p, metrics.ReadExeTimeHistogram)
+                       collection = l.appendHistogramValue(collection, 
metricsPrefix, "connect_exe_time", traffic, p, metrics.ConnectExeTimeHistogram)
+                       collection = l.appendHistogramValue(collection, 
metricsPrefix, "close_exe_time", traffic, p, metrics.CloseExeTimeHistogram)
+
+                       if len(collection) == 0 {
+                               continue
+                       }
+
+                       builder.AppendMetrics(p.Entity().ServiceName, 
p.Entity().InstanceName, collection)
+               }
+       }
+}
+
+func (l *Listener) logTheMetricsConnections(traffics []*base.ProcessTraffic) {
+       if !log.Enable(logrus.DebugLevel) {
+               return
+       }
+       for _, traffic := range traffics {
+               side := traffic.Role.String()
+               layer4 := l.getMetrics(traffic.Metrics)
+               log.Debugf("connection layer4 analyze result: %s : %s, 
protocol: %s, is SSL: %t, write: %d bytes/%d, read: %d bytes/%d",
+                       side, traffic.GenerateConnectionInfo(), 
traffic.Protocol.String(), traffic.IsSSL, layer4.WriteCounter.Cur.Bytes, 
layer4.WriteCounter.Cur.Count,
+                       layer4.ReadCounter.Cur.Bytes, 
layer4.ReadCounter.Cur.Count)
+       }
+}
+
+func (l *Listener) generateConID(conID, randomID uint64) string {
+       return fmt.Sprintf("%d_%d", conID, randomID)
+}
+
+func (l *Listener) cleanAndGetAllExceptionContexts() 
map[SocketBasicKey]*SocketExceptionValue {
+       l.socketExceptionOperationLock.Lock()
+       defer l.socketExceptionOperationLock.Unlock()
+
+       result := l.socketExceptionStatics
+       l.socketExceptionStatics = 
make(map[SocketBasicKey]*SocketExceptionValue)
+       return result
+}
+
+func (l *Listener) combineExceptionToConnections(ccs 
map[string]*base.ConnectionContext, exps 
map[SocketBasicKey]*SocketExceptionValue) {
+       for key, value := range exps {
+               var remotePort, localPort = uint16(key.RemotePort), 
uint16(key.LocalPort)
+               var remoteIP, localIP string
+
+               if key.Family == unix.AF_INET {
+                       remoteIP = parseAddressV4(key.RemoteAddrV4)
+                       localIP = parseAddressV4(key.LocalAddrV4)
+               } else if key.Family == unix.AF_INET6 {
+                       remoteIP = parseAddressV6(key.RemoteAddrV6)
+                       localIP = parseAddressV6(key.LocalAddrV6)
+               } else {
+                       continue
+               }
+
+               var firstRemoteMatch *base.ConnectionContext
+               var foundAllAddrMatch bool
+               for _, cc := range ccs {
+                       // only add to the first matches
+                       if cc.RemoteIP == remoteIP && cc.RemotePort == 
remotePort {
+                               firstRemoteMatch = cc
+                               if cc.LocalIP == localIP && cc.LocalPort == 
localPort {
+                                       
l.mergeExceptionToAppointConnection(value, cc)
+                                       foundAllAddrMatch = true
+                                       break
+                               }
+                       }
+               }
+
+               // if only remote address match, then just add to the first one
+               if !foundAllAddrMatch && firstRemoteMatch != nil {
+                       l.mergeExceptionToAppointConnection(value, 
firstRemoteMatch)
+               }
+       }
+}
+
+func (l *Listener) mergeExceptionToAppointConnection(expCtx 
*SocketExceptionValue, conCtx *base.ConnectionContext) {
+       layer4 := l.getMetrics(conCtx.Metrics)
+       layer4.DropCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, 
uint64(expCtx.DropCount), 0))
+       
layer4.RetransmitCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, 
uint64(expCtx.RetransmitCount), 0))
+}
+
+func (l *Listener) appendCounterValues(metrics []*v3.MeterData, metricsPrefix, 
name string, traffic *base.ProcessTraffic,
+       local api.ProcessInterface, counter *SocketDataCounterWithHistory) 
[]*v3.MeterData {
+       metric := counter.Cur
+       if !metric.NotEmpty() {
+               return metrics
+       }
+
+       count := float64(metric.Count)
+       metrics = append(metrics, l.buildSingleValue(metricsPrefix, 
name+"_counts_counter", traffic, local, count))
+       if metric.Bytes > 0 {
+               metrics = append(metrics, l.buildSingleValue(metricsPrefix, 
name+"_bytes_counter", traffic, local, float64(metric.Bytes)))
+       }
+       if metric.ExeTime > 0 {
+               metrics = append(metrics, l.buildSingleValue(metricsPrefix, 
name+"_exe_time_counter", traffic, local, float64(metric.ExeTime)/count))
+       }
+       return metrics
+}
+
+func (l *Listener) appendHistogramValue(metrics []*v3.MeterData, 
metricsPrefix, name string, traffic *base.ProcessTraffic,
+       local api.ProcessInterface, histogram *SocketDataHistogramWithHistory) 
[]*v3.MeterData {
+       data := histogram.Cur
+       if !data.NotEmpty() {
+               return metrics
+       }
+
+       role, labels := l.buildBasicMeterLabels(traffic, local)
+       values := make([]*v3.MeterBucketValue, 0)
+       for bucket, count := range data.Buckets {
+               var bucketInx = int(bucket)
+               if bucketInx >= SocketHistogramBucketsCount {
+                       bucketInx = SocketHistogramBucketsCount - 1
+               }
+               var buckets []float64
+               if data.Unit == HistogramDataUnitUS {
+                       buckets = SocketHistogramBucketsUs
+               } else {
+                       buckets = SocketHistogramBucketsNs
+               }
+               values = append(values, &v3.MeterBucketValue{
+                       Bucket: buckets[bucketInx],
+                       Count:  int64(count),
+               })
+       }
+
+       return append(metrics, &v3.MeterData{
+               Metric: &v3.MeterData_Histogram{
+                       Histogram: &v3.MeterHistogram{
+                               Name:   fmt.Sprintf("%s%s_%s_histogram", 
metricsPrefix, role.String(), name),
+                               Labels: labels,
+                               Values: values,
+                       },
+               },
+       })
+}
+
+func (l *Listener) buildSingleValue(prefix, name string, traffic 
*base.ProcessTraffic, local api.ProcessInterface, val float64) *v3.MeterData {
+       role, labels := l.buildBasicMeterLabels(traffic, local)
+
+       return &v3.MeterData{
+               Metric: &v3.MeterData_SingleValue{
+                       SingleValue: &v3.MeterSingleValue{
+                               Name:   fmt.Sprintf("%s%s_%s", prefix, 
role.String(), name),
+                               Labels: labels,
+                               Value:  val,
+                       },
+               },
+       }
+}
+
+func (l *Listener) buildBasicMeterLabels(traffic *base.ProcessTraffic, local 
api.ProcessInterface) (base.ConnectionRole, []*v3.Label) {
+       curRole := traffic.Role
+       // add the default role
+       if curRole == base.ConnectionRoleUnknown {
+               curRole = base.ConnectionRoleClient
+       }
+       labels := make([]*v3.Label, 0)
+
+       // two pair process/address info
+       labels = l.appendMeterValue(labels, fmt.Sprintf("%s_process_id", 
curRole.String()), local.ID())
+       labels = l.appendRemoteAddressInfo(labels, traffic, 
curRole.Revert().String(), local)
+
+       labels = l.appendMeterValue(labels, "side", curRole.String())
+
+       // protocol and ssl
+       labels = l.appendMeterValue(labels, "protocol", 
traffic.Protocol.String())
+       labels = l.appendMeterValue(labels, "is_ssl", fmt.Sprintf("%t", 
traffic.IsSSL))
+       return curRole, labels
+}
+
+func (l *Listener) appendRemoteAddressInfo(labels []*v3.Label, traffic 
*base.ProcessTraffic, prefix string, local api.ProcessInterface) []*v3.Label {
+       if len(traffic.RemoteProcesses) != 0 {
+               for _, p := range traffic.RemoteProcesses {
+                       // only match with same service instance
+                       if local.Entity().ServiceName == p.Entity().ServiceName 
&&
+                               local.Entity().InstanceName == 
p.Entity().InstanceName {
+                               return l.appendMeterValue(labels, 
prefix+"_process_id", p.ID())
+                       }
+               }
+       }
+
+       if tools.IsLocalHostAddress(traffic.RemoteIP) || 
traffic.Analyzer.IsLocalAddressInCache(traffic.RemoteIP) {
+               return l.appendMeterValue(labels, prefix+"_local", "true")
+       }
+
+       return l.appendMeterValue(labels, prefix+"_address", 
fmt.Sprintf("%s:%d", traffic.RemoteIP, traffic.RemotePort))
+}
+
+func (l *Listener) appendMeterValue(labels []*v3.Label, name, value string) 
[]*v3.Label {
+       return append(labels, &v3.Label{
+               Name:  name,
+               Value: value,
+       })
+}
+
+func (l *Listener) getMetrics(connectionMetrics *base.ConnectionMetrics) 
*Metrics {
+       return connectionMetrics.GetMetrics(Name).(*Metrics)
+}
+
+type HistogramDataKey struct {
+       ConnectionID  uint64
+       RandomID      uint64
+       DataDirection base.SocketDataDirection
+       DataType      base.SocketDataStaticsType
+       Bucket        uint64
+}
+
+func parseAddressV4(val uint32) string {
+       return net.IP((*(*[net.IPv4len]byte)(unsafe.Pointer(&val)))[:]).String()
+}
+
+func parseAddressV6(val [16]uint8) string {
+       return net.IP((*(*[net.IPv6len]byte)(unsafe.Pointer(&val)))[:]).String()
+}
diff --git a/pkg/profiling/task/network/analyze/layer4/metrics.go 
b/pkg/profiling/task/network/analyze/layer4/metrics.go
new file mode 100644
index 0000000..f6a6658
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer4/metrics.go
@@ -0,0 +1,276 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package layer4
+
+import 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+
+type Metrics struct {
+       // basic statics
+       // read/write
+       WriteCounter *SocketDataCounterWithHistory
+       ReadCounter  *SocketDataCounterWithHistory
+       // write RTT
+       WriteRTTCounter *SocketDataCounterWithHistory
+
+       // histograms
+       // write execute time and RTT
+       WriteRTTHistogram     *SocketDataHistogramWithHistory
+       WriteExeTimeHistogram *SocketDataHistogramWithHistory
+       // read execute time
+       ReadExeTimeHistogram *SocketDataHistogramWithHistory
+
+       // the connection connect or close execute time
+       ConnectExecuteTime      uint64
+       CloseExecuteTime        uint64
+       ConnectCounter          *SocketDataCounterWithHistory
+       CloseCounter            *SocketDataCounterWithHistory
+       ConnectExeTimeHistogram *SocketDataHistogramWithHistory
+       CloseExeTimeHistogram   *SocketDataHistogramWithHistory
+
+       // exception counters
+       RetransmitCounter *SocketDataCounterWithHistory
+       DropCounter       *SocketDataCounterWithHistory
+}
+
+func NewLayer4Metrics() *Metrics {
+       return &Metrics{
+               WriteCounter:            NewSocketDataCounterWithHistory(),
+               ReadCounter:             NewSocketDataCounterWithHistory(),
+               WriteRTTCounter:         NewSocketDataCounterWithHistory(),
+               WriteRTTHistogram:       
NewSocketDataHistogramWithHistory(HistogramDataUnitUS),
+               WriteExeTimeHistogram:   
NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
+               ReadExeTimeHistogram:    
NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
+               ConnectCounter:          NewSocketDataCounterWithHistory(),
+               ConnectExeTimeHistogram: 
NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
+               CloseCounter:            NewSocketDataCounterWithHistory(),
+               CloseExeTimeHistogram:   
NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
+               RetransmitCounter:       NewSocketDataCounterWithHistory(),
+               DropCounter:             NewSocketDataCounterWithHistory(),
+       }
+}
+
+func (l *Metrics) FlushMetrics(connection *base.ConnectionContext) {
+       metrics := connection.Metrics.GetMetrics(Name).(*Metrics)
+
+       
l.WriteCounter.IncreaseToCurrent(metrics.WriteCounter.CalculateIncrease())
+       l.ReadCounter.IncreaseToCurrent(metrics.ReadCounter.CalculateIncrease())
+       
l.WriteRTTCounter.IncreaseToCurrent(metrics.WriteRTTCounter.CalculateIncrease())
+
+       
l.WriteRTTHistogram.IncreaseToCurrent(metrics.WriteRTTHistogram.CalculateIncrease())
+       
l.WriteExeTimeHistogram.IncreaseToCurrent(metrics.WriteExeTimeHistogram.CalculateIncrease())
+       
l.ReadExeTimeHistogram.IncreaseToCurrent(metrics.ReadExeTimeHistogram.CalculateIncrease())
+
+       
l.RetransmitCounter.IncreaseToCurrent(metrics.RetransmitCounter.CalculateIncrease())
+       l.DropCounter.IncreaseToCurrent(metrics.DropCounter.CalculateIncrease())
+
+       if connection.FlushDataCount == 0 && metrics.ConnectExecuteTime > 0 {
+               
l.ConnectCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, 1, 
metrics.ConnectExecuteTime))
+               
l.ConnectExeTimeHistogram.Cur.IncreaseByValue(metrics.ConnectExecuteTime)
+       }
+       if connection.FlushDataCount == 0 && metrics.CloseExecuteTime > 0 {
+               
l.CloseCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, 1, 
metrics.CloseExecuteTime))
+               
l.CloseExeTimeHistogram.Cur.IncreaseByValue(metrics.CloseExecuteTime)
+       }
+}
+
+type SocketDataCounter struct {
+       Bytes   uint64
+       Count   uint64
+       ExeTime uint64
+}
+
+func NewSocketDataCounter() *SocketDataCounter {
+       return &SocketDataCounter{}
+}
+
+func NewSocketDataCounterWithValue(bytes, count, exeTime uint64) 
*SocketDataCounter {
+       ret := &SocketDataCounter{}
+       ret.IncreaseByValue(bytes, count, exeTime)
+       return ret
+}
+
+func (s *SocketDataCounter) Increase(d *SocketDataCounter) {
+       s.IncreaseByValue(d.Bytes, d.Count, d.ExeTime)
+}
+
+func (s *SocketDataCounter) IncreaseByValue(bytes, count, exeTime uint64) {
+       s.Bytes += bytes
+       s.Count += count
+       s.ExeTime += exeTime
+}
+
+func (s *SocketDataCounter) NotEmpty() bool {
+       return s.Count > 0
+}
+
+// SocketDataCounterWithHistory means the socket send/receive data metrics
+type SocketDataCounterWithHistory struct {
+       Pre *SocketDataCounter
+       Cur *SocketDataCounter
+}
+
+func NewSocketDataCounterWithHistory() *SocketDataCounterWithHistory {
+       return &SocketDataCounterWithHistory{
+               Pre: NewSocketDataCounter(),
+               Cur: NewSocketDataCounter(),
+       }
+}
+
+func (c *SocketDataCounterWithHistory) RefreshCurrent() {
+       c.Pre = c.Cur
+       c.Cur = NewSocketDataCounterWithValue(c.Cur.Bytes, c.Cur.Count, 
c.Cur.ExeTime)
+}
+
+func (c *SocketDataCounterWithHistory) UpdateToCurrent(bytes, count, exeTime 
uint64) {
+       c.Pre = c.Cur
+       c.Cur = &SocketDataCounter{
+               Bytes:   bytes,
+               Count:   count,
+               ExeTime: exeTime,
+       }
+}
+
+func (c *SocketDataCounterWithHistory) IncreaseToCurrent(other 
*SocketDataCounter) {
+       c.Cur.Increase(other)
+}
+
+func (c *SocketDataCounterWithHistory) CalculateIncrease() *SocketDataCounter {
+       return &SocketDataCounter{
+               Bytes:   subtractionValue(c.Cur.Bytes, c.Pre.Bytes),
+               Count:   subtractionValue(c.Cur.Count, c.Pre.Count),
+               ExeTime: subtractionValue(c.Cur.ExeTime, c.Pre.ExeTime),
+       }
+}
+
+// SocketHistogramBucketsNs means the histogram bucket: 0ms, 0.01ms, 0.05ms, 
0.1ms, 0.5ms, 1ms, 1.2ms, 1.5ms, 1.7ms, 2ms,
+// 2.5ms, 3ms, 5ms, 7ms, 10ms, 13ms, 16ms, 20ms, 25ms, 30ms, 35ms, 40ms, 45ms, 
50ms, 70ms, 100ms, 150ms,
+// 200ms, 300ms, 500ms, 1s, 2s, 3s, 5s
+// value unit: ns
+var SocketHistogramBucketsNs = []float64{0, 10000, 50000, 100000, 500000, 
1000000, 1200000, 1500000, 1700000, 2000000,
+       2500000, 3000000, 5000000, 7000000, 10000000, 13000000, 16000000, 
20000000, 25000000, 30000000, 35000000, 40000000,
+       45000000, 50000000, 70000000, 100000000, 150000000, 200000000, 
300000000, 500000000, 1000000000, 2000000000,
+       3000000000, 5000000000}
+
+// SocketHistogramBucketsUs same with SocketHistogramBucketsNs, but the value 
unit: us
+var SocketHistogramBucketsUs = []float64{0, 10, 50, 100, 500, 1000, 1200, 
1500, 1700, 2000,
+       2500, 3000, 5000, 7000, 10000, 13000, 16000, 20000, 25000, 30000, 
35000, 40000,
+       45000, 50000, 70000, 100000, 150000, 200000, 300000, 500000, 1000000, 
2000000,
+       3000000, 5000000}
+var SocketHistogramBucketsCount = len(SocketHistogramBucketsNs)
+
+type SocketDataHistogram struct {
+       Unit    HistogramDataUnit
+       Buckets map[uint64]uint32
+}
+
+func (h *SocketDataHistogram) Overwrite(other *SocketDataHistogram) {
+       for k, v := range other.Buckets {
+               h.Buckets[k] = v
+       }
+}
+
+func (h *SocketDataHistogram) Update(bucket uint64, value uint32) {
+       h.Buckets[bucket] = value
+}
+
+func (h *SocketDataHistogram) Increase(other *SocketDataHistogram) {
+       for k, v := range other.Buckets {
+               h.Buckets[k] += v
+       }
+}
+
+func (h *SocketDataHistogram) IncreaseByValue(val uint64) {
+       floatVal := float64(val)
+       for inx, curVal := range SocketHistogramBucketsNs {
+               if inx > 0 && curVal > floatVal {
+                       h.Buckets[uint64(inx-1)]++
+                       return
+               }
+       }
+       h.Buckets[uint64(len(SocketHistogramBucketsNs)-1)]++
+}
+
+func (h *SocketDataHistogram) NotEmpty() bool {
+       for _, v := range h.Buckets {
+               if v > 0 {
+                       return true
+               }
+       }
+       return false
+}
+
+func NewSocketDataHistogram(unit HistogramDataUnit) *SocketDataHistogram {
+       buckets := make(map[uint64]uint32, SocketHistogramBucketsCount)
+       for i := 0; i < SocketHistogramBucketsCount; i++ {
+               buckets[uint64(i)] = 0
+       }
+       return &SocketDataHistogram{
+               Unit:    unit,
+               Buckets: buckets,
+       }
+}
+
+type HistogramDataUnit int
+
+const (
+       HistogramDataUnitNS HistogramDataUnit = 1
+       HistogramDataUnitUS HistogramDataUnit = 2
+)
+
+type SocketDataHistogramWithHistory struct {
+       Pre *SocketDataHistogram
+       Cur *SocketDataHistogram
+}
+
+func NewSocketDataHistogramWithHistory(unit HistogramDataUnit) 
*SocketDataHistogramWithHistory {
+       return &SocketDataHistogramWithHistory{
+               Pre: NewSocketDataHistogram(unit),
+               Cur: NewSocketDataHistogram(unit),
+       }
+}
+
+func (h *SocketDataHistogramWithHistory) RefreshCurrent() {
+       // storage the current value to the previous buckets
+       h.Pre.Overwrite(h.Cur)
+}
+
+func (h *SocketDataHistogramWithHistory) UpdateToCurrent(bucket uint64, val 
uint32) {
+       h.Cur.Update(bucket, val)
+}
+
+func (h *SocketDataHistogramWithHistory) IncreaseToCurrent(other 
*SocketDataHistogram) {
+       h.Cur.Increase(other)
+}
+
+func (h *SocketDataHistogramWithHistory) CalculateIncrease() 
*SocketDataHistogram {
+       histogram := NewSocketDataHistogram(h.Cur.Unit)
+       var increaseVal uint32
+       for curK, curV := range h.Cur.Buckets {
+               if increaseVal = curV - h.Pre.Buckets[curK]; increaseVal > 0 {
+                       histogram.Buckets[curK] = increaseVal
+               }
+       }
+       return histogram
+}
+
+func subtractionValue(v1, v2 uint64) uint64 {
+       if v1 > v2 {
+               return v1 - v2
+       }
+       return 0
+}
diff --git a/pkg/profiling/task/network/bpf/bpf.go 
b/pkg/profiling/task/network/bpf/bpf.go
new file mode 100644
index 0000000..2181f09
--- /dev/null
+++ b/pkg/profiling/task/network/bpf/bpf.go
@@ -0,0 +1,56 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bpf
+
+import (
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
+
+       "github.com/hashicorp/go-multierror"
+)
+
+// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
+// nolint
+//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target bpfel -cc 
$BPF_CLANG -cflags $BPF_CFLAGS bpf 
$REPO_ROOT/bpf/profiling/network/netmonitor.c -- -I$REPO_ROOT/bpf/include 
-D__TARGET_ARCH_x86
+
+type Loader struct {
+       *Linker
+       *bpfObjects
+}
+
+func NewLoader() (*Loader, error) {
+       objs := bpfObjects{}
+       if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); 
err != nil {
+               return nil, err
+       }
+
+       return &Loader{
+               bpfObjects: &objs,
+               Linker:     NewLinker(),
+       }, nil
+}
+
+func (l *Loader) Close() error {
+       var err error
+       if e := l.bpfObjects.Close(); e != nil {
+               err = multierror.Append(err, e)
+       }
+       if e := l.Linker.Close(); e != nil {
+               err = multierror.Append(err, e)
+       }
+       return err
+}
diff --git a/pkg/profiling/task/network/linker.go 
b/pkg/profiling/task/network/bpf/linker.go
similarity index 98%
rename from pkg/profiling/task/network/linker.go
rename to pkg/profiling/task/network/bpf/linker.go
index c8cdf1e..7664918 100644
--- a/pkg/profiling/task/network/linker.go
+++ b/pkg/profiling/task/network/bpf/linker.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package network
+package bpf
 
 import (
        "bytes"
@@ -28,6 +28,7 @@ import (
 
        "golang.org/x/arch/x86/x86asm"
 
+       "github.com/apache/skywalking-rover/pkg/logger"
        "github.com/apache/skywalking-rover/pkg/tools"
        "github.com/apache/skywalking-rover/pkg/tools/elf"
 
@@ -38,6 +39,8 @@ import (
        "github.com/hashicorp/go-multierror"
 )
 
+var log = logger.GetLogger("profiling", "task", "network", "bpf")
+
 const defaultSymbolPrefix = "sys_"
 
 type LinkFunc func(symbol string, prog *ebpf.Program) (link.Link, error)
diff --git a/pkg/profiling/task/network/context.go 
b/pkg/profiling/task/network/context.go
deleted file mode 100644
index 3ebc2eb..0000000
--- a/pkg/profiling/task/network/context.go
+++ /dev/null
@@ -1,726 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package network
-
-import (
-       "context"
-       "encoding/json"
-       "errors"
-       "fmt"
-       "net"
-       "sync"
-       "unsafe"
-
-       cmap "github.com/orcaman/concurrent-map"
-
-       "github.com/sirupsen/logrus"
-
-       "github.com/hashicorp/go-multierror"
-
-       "github.com/cilium/ebpf"
-
-       "golang.org/x/sys/unix"
-
-       "github.com/apache/skywalking-rover/pkg/process/api"
-)
-
-type Context struct {
-       processes map[int32][]api.ProcessInterface
-
-       bpf    *bpfObjects // current bpf programs
-       linker *Linker
-
-       // standard syscall connections
-       activeConnections cmap.ConcurrentMap      // current activeConnections 
connections
-       closedConnections []*ConnectionContext    // closed connections'
-       flushClosedEvents chan *SocketCloseEvent  // connection have been 
closed, it is a queue to cache unknown active connections
-       sockParseQueue    chan *ConnectionContext // socket address parse queue
-
-       // socket retransmit/drop
-       socketExceptionStatics       map[SocketBasicKey]*SocketExceptionValue
-       socketExceptionOperationLock sync.Mutex
-}
-
-type SocketBasicKey struct {
-       Pid          uint32
-       Family       uint32
-       RemoteAddrV4 uint32
-       RemoteAddrV6 [16]uint8
-       RemotePort   uint32
-       LocalAddrV4  uint32
-       LocalAddrV6  [16]uint8
-       LocalPort    uint32
-}
-
-type SocketExceptionValue struct {
-       DropCount       int
-       RetransmitCount int
-}
-
-type ConnectionContext struct {
-       // basic metadata
-       ConnectionID     uint64
-       RandomID         uint64
-       LocalPid         uint32
-       SocketFD         uint32
-       LocalProcesses   []api.ProcessInterface
-       ConnectionClosed bool
-       Protocol         ConnectionProtocol
-       IsSSL            bool
-
-       // socket metadata
-       Role       ConnectionRole
-       LocalIP    string
-       LocalPort  uint16
-       RemoteIP   string
-       RemotePort uint16
-
-       // basic statics
-       // read/write
-       WriteCounter *SocketDataCounterWithHistory
-       ReadCounter  *SocketDataCounterWithHistory
-       // write RTT
-       WriteRTTCounter *SocketDataCounterWithHistory
-
-       // histograms
-       // write execute time and RTT
-       WriteRTTHistogram     *SocketDataHistogramWithHistory
-       WriteExeTimeHistogram *SocketDataHistogramWithHistory
-       // read execute time
-       ReadExeTimeHistogram *SocketDataHistogramWithHistory
-
-       // the connection connect or close execute time
-       ConnectExecuteTime uint64
-       CloseExecuteTime   uint64
-
-       // exception counters
-       RetransmitCounter *SocketDataCounter
-       DropCounter       *SocketDataCounter
-
-       // Flush the data content to the oap count
-       FlushDataCount int
-}
-
-func NewContext() *Context {
-       return &Context{
-               activeConnections:      cmap.New(),
-               closedConnections:      make([]*ConnectionContext, 0),
-               flushClosedEvents:      make(chan *SocketCloseEvent, 5000),
-               sockParseQueue:         make(chan *ConnectionContext, 5000),
-               processes:              make(map[int32][]api.ProcessInterface),
-               socketExceptionStatics: 
make(map[SocketBasicKey]*SocketExceptionValue),
-       }
-}
-
-func (c *Context) Init(bpf *bpfObjects, linker *Linker) {
-       c.bpf = bpf
-       c.linker = linker
-}
-
-func (c *Context) RegisterAllHandlers() {
-       // socket connect
-       c.linker.ReadEventAsync(c.bpf.SocketConnectionEventQueue, 
c.handleSocketConnectEvent, func() interface{} {
-               return &SocketConnectEvent{}
-       })
-       // socket close
-       c.linker.ReadEventAsync(c.bpf.SocketCloseEventQueue, 
c.handleSocketCloseEvent, func() interface{} {
-               return &SocketCloseEvent{}
-       })
-       // socket retransmit
-       c.linker.ReadEventAsync(c.bpf.SocketExceptionOperationEventQueue, 
c.handleSocketExceptionOperationEvent, func() interface{} {
-               return &SocketExceptionOperationEvent{}
-       })
-}
-
-func (c *Context) FlushAllConnection() ([]*ConnectionContext, error) {
-       // handling the unfinished close event
-       c.batchReProcessCachedCloseEvent()
-
-       // get all connection context and fill the metrics
-       allContexts := c.getAllConnectionWithContext()
-       c.fillConnectionMetrics(allContexts)
-
-       // all the exception operations to the context
-       exceptionContexts := c.cleanAndGetAllExceptionContexts()
-       // init all exception counters
-       for _, ctx := range allContexts {
-               ctx.DropCounter = NewSocketDataCounter()
-               ctx.RetransmitCounter = NewSocketDataCounter()
-       }
-       c.combineExceptionToConnections(allContexts, exceptionContexts)
-
-       return allContexts, nil
-}
-
-func (c *Context) StartSocketAddressParser(ctx context.Context) {
-       for i := 0; i < 2; i++ {
-               go c.handleSocketParseQueue(ctx)
-       }
-}
-
-func (c *Context) handleSocketParseQueue(ctx context.Context) {
-       for {
-               select {
-               case cc := <-c.sockParseQueue:
-                       socket, err := ParseSocket(cc.LocalPid, cc.SocketFD)
-                       if err != nil {
-                               // if the remote port of connection is empty, 
then this connection not available basically
-                               if cc.RemotePort == 0 {
-                                       log.Warnf("complete the socket error, 
pid: %d, fd: %d, error: %v", cc.LocalPid, cc.SocketFD, err)
-                               }
-                               continue
-                       }
-                       cc.LocalIP = socket.SrcIP
-                       cc.LocalPort = socket.SrcPort
-                       cc.RemoteIP = socket.DestIP
-                       cc.RemotePort = socket.DestPort
-               case <-ctx.Done():
-                       return
-               }
-       }
-}
-
-func (c *Context) combineExceptionToConnections(ccs []*ConnectionContext, exps 
map[SocketBasicKey]*SocketExceptionValue) {
-       for key, value := range exps {
-               var remotePort, localPort = uint16(key.RemotePort), 
uint16(key.LocalPort)
-               var remoteIP, localIP string
-
-               if key.Family == unix.AF_INET {
-                       remoteIP = parseAddressV4(key.RemoteAddrV4)
-                       localIP = parseAddressV4(key.LocalAddrV4)
-               } else if key.Family == unix.AF_INET6 {
-                       remoteIP = parseAddressV6(key.RemoteAddrV6)
-                       localIP = parseAddressV6(key.LocalAddrV6)
-               } else {
-                       continue
-               }
-
-               var firstRemoteMatch *ConnectionContext
-               var foundAllAddrMatch bool
-               for _, cc := range ccs {
-                       // only add to the first matches
-                       if cc.RemoteIP == remoteIP && cc.RemotePort == 
remotePort {
-                               firstRemoteMatch = cc
-                               if cc.LocalIP == localIP && cc.LocalPort == 
localPort {
-                                       
c.mergeExceptionToAppointConnection(value, cc)
-                                       foundAllAddrMatch = true
-                                       break
-                               }
-                       }
-               }
-
-               // if only remote address match, then just add to the first one
-               if !foundAllAddrMatch && firstRemoteMatch != nil {
-                       c.mergeExceptionToAppointConnection(value, 
firstRemoteMatch)
-               }
-       }
-}
-
-func (c *Context) mergeExceptionToAppointConnection(expCtx 
*SocketExceptionValue, conCtx *ConnectionContext) {
-       conCtx.DropCounter.IncreaseByValue(0, uint64(expCtx.DropCount), 0)
-       conCtx.RetransmitCounter.IncreaseByValue(0, 
uint64(expCtx.RetransmitCount), 0)
-}
-
-func (c *Context) cleanAndGetAllExceptionContexts() 
map[SocketBasicKey]*SocketExceptionValue {
-       c.socketExceptionOperationLock.Lock()
-       defer c.socketExceptionOperationLock.Unlock()
-
-       result := c.socketExceptionStatics
-       c.socketExceptionStatics = 
make(map[SocketBasicKey]*SocketExceptionValue)
-       return result
-}
-
-func (c *Context) getAllConnectionWithContext() []*ConnectionContext {
-       result := make([]*ConnectionContext, 0)
-       result = append(result, c.closedConnections...)
-       for _, con := range c.activeConnections.Items() {
-               result = append(result, con.(*ConnectionContext))
-       }
-
-       c.closedConnections = make([]*ConnectionContext, 0)
-       return result
-}
-
-type ActiveConnectionInBPF struct {
-       RandomID     uint64
-       Pid          uint32
-       SocketFD     uint32
-       Role         ConnectionRole
-       SocketFamily uint32
-
-       RemoteAddrV4   uint32
-       RemoteAddrV6   [16]uint8
-       RemoteAddrPort uint32
-       LocalAddrV4    uint32
-       LocalAddrV6    [16]uint8
-       LocalAddrPort  uint32
-
-       WriteBytes   uint64
-       WriteCount   uint64
-       WriteExeTime uint64
-       ReadBytes    uint64
-       ReadCount    uint64
-       ReadExeTime  uint64
-
-       WriteRTTCount   uint64
-       WriteRTTExeTime uint64
-
-       // Protocol analyze context
-       Protocol ConnectionProtocol
-       IsSSL    uint32
-
-       // the connect event is already sent
-       ConnectEventIsSent uint32
-}
-
-type HistogramDataKey struct {
-       ConnectionID  uint64
-       RandomID      uint64
-       DataDirection SocketDataDirection
-       DataType      SocketDataStaticsType
-       Bucket        uint64
-}
-
-func (c *Context) fillConnectionMetrics(ccs []*ConnectionContext) {
-       // rebuild to the map for helping quick search correlate 
ConnectionContext
-       keyWithContext := make(map[string]*ConnectionContext)
-       var activeConnection ActiveConnectionInBPF
-       closedConns := make([]string, 0)
-       for _, cc := range ccs {
-               connectionKey := c.generateConnectionKey(cc.ConnectionID, 
cc.RandomID)
-               // refresh the histogram for prepare to update the buckets
-               cc.WriteRTTHistogram.RefreshCurrent()
-               cc.WriteExeTimeHistogram.RefreshCurrent()
-               cc.ReadExeTimeHistogram.RefreshCurrent()
-               keyWithContext[connectionKey] = cc
-
-               // if connection not closed, then load the basic stats from bpf 
map
-               if !cc.ConnectionClosed {
-                       err := 
c.bpf.ActiveConnectionMap.Lookup(cc.ConnectionID, &activeConnection)
-
-                       if err != nil {
-                               if errors.Is(err, ebpf.ErrKeyNotExist) {
-                                       closedConns = append(closedConns, 
connectionKey)
-                               } else {
-                                       log.Warnf("lookup the active connection 
error, connection id: %d, error: %v", cc.ConnectionID, err)
-                               }
-                               continue
-                       }
-
-                       if log.Enable(logrus.DebugLevel) {
-                               marshal, _ := json.Marshal(activeConnection)
-                               log.Debugf("found the active connection, conid: 
%d, data: %s", cc.ConnectionID, string(marshal))
-                       }
-
-                       if cc.Role == ConnectionRoleUnknown && 
activeConnection.Role != ConnectionRoleUnknown {
-                               cc.Role = activeConnection.Role
-                       }
-                       if cc.Protocol == ConnectionProtocolUnknown && 
activeConnection.Protocol != ConnectionProtocolUnknown {
-                               cc.Protocol = activeConnection.Protocol
-                       }
-                       if !cc.IsSSL && activeConnection.IsSSL == 1 {
-                               cc.IsSSL = true
-                       }
-
-                       // update the role
-                       
cc.WriteCounter.UpdateToCurrent(activeConnection.WriteBytes, 
activeConnection.WriteCount, activeConnection.WriteExeTime)
-                       
cc.ReadCounter.UpdateToCurrent(activeConnection.ReadBytes, 
activeConnection.ReadCount, activeConnection.ReadExeTime)
-                       cc.WriteRTTCounter.UpdateToCurrent(0, 
activeConnection.WriteRTTCount, activeConnection.WriteRTTExeTime)
-               }
-       }
-       if len(closedConns) > 0 {
-               c.deleteConnectionOnly(closedConns)
-       }
-
-       // fill the histogram metrics
-       c.fillHistograms(keyWithContext)
-}
-
-func (c *Context) fillHistograms(keyWithContext map[string]*ConnectionContext) 
{
-       var key HistogramDataKey
-       var count uint32
-       histogramIt := c.bpf.SocketConnectionStatsHistogram.Iterate()
-       // for-each the stats map
-       for histogramIt.Next(&key, &count) {
-               // if it's not relate to the ConnectionContext just ignore
-               cc := keyWithContext[c.generateConnectionKey(key.ConnectionID, 
key.RandomID)]
-               if cc == nil {
-                       continue
-               }
-
-               // add the histogram data
-               var histogram *SocketDataHistogramWithHistory
-               if key.DataDirection == SocketDataDirectionEgress {
-                       if key.DataType == SocketDataStaticsTypeExeTime {
-                               histogram = cc.WriteExeTimeHistogram
-                       } else if key.DataType == SocketDataStaticsTypeRTT {
-                               histogram = cc.WriteRTTHistogram
-                       }
-               } else if key.DataDirection == SocketDataDirectionIngress {
-                       histogram = cc.ReadExeTimeHistogram
-               }
-               if histogram == nil {
-                       log.Warnf("unknown the histogram data: %v", cc)
-                       continue
-               }
-               histogram.UpdateToCurrent(key.Bucket, count)
-
-               // delete the stats if the connection already closed
-               if cc.ConnectionClosed {
-                       if err := 
c.bpf.SocketConnectionStatsHistogram.Delete(key); err != nil {
-                               log.Warnf("delete the connection stats failure: 
%v", err)
-                       }
-               }
-       }
-}
-
-// SocketConnectEvent Socket have been connection/accept event
-type SocketConnectEvent struct {
-       ConID        uint64
-       RandomID     uint64
-       ExeTime      uint64
-       NeedComplete uint32
-       Pid          uint32
-       FD           uint32
-       FuncName     uint32
-
-       // socket information if exists
-       Role           ConnectionRole
-       SocketFamily   uint32
-       RemoteAddrV4   uint32
-       RemoteAddrV6   [16]uint8
-       RemoteAddrPort uint32
-       LocalAddrV4    uint32
-       LocalAddrV6    [16]uint8
-       LocalAddrPort  uint32
-}
-
-func (c *Context) handleSocketConnectEvent(data interface{}) {
-       event := data.(*SocketConnectEvent)
-       processes := c.processes[int32(event.Pid)]
-       if len(processes) == 0 {
-               log.Warnf("get process connect event, but this process is don't 
need to monitor, pid: %d", event.Pid)
-               return
-       }
-
-       // build active connection information
-       con := c.newConnectionContext(event.ConID, event.RandomID, event.Pid, 
event.FD, processes, false)
-       con.ConnectExecuteTime = event.ExeTime
-       con.Role = event.Role
-       if event.NeedComplete == 0 {
-               con.RemotePort = uint16(event.RemoteAddrPort)
-               con.LocalPort = uint16(event.LocalAddrPort)
-               if event.SocketFamily == unix.AF_INET {
-                       con.LocalIP = parseAddressV4(event.LocalAddrV4)
-                       con.RemoteIP = parseAddressV4(event.RemoteAddrV4)
-               } else {
-                       con.LocalIP = parseAddressV6(event.LocalAddrV6)
-                       con.RemoteIP = parseAddressV6(event.RemoteAddrV6)
-               }
-       } else {
-               // if the remote address exists then setting it
-               if event.RemoteAddrPort != 0 {
-                       con.RemotePort = uint16(event.RemoteAddrPort)
-                       if event.SocketFamily == unix.AF_INET {
-                               con.RemoteIP = 
parseAddressV4(event.RemoteAddrV4)
-                       } else {
-                               con.RemoteIP = 
parseAddressV6(event.RemoteAddrV6)
-                       }
-               }
-               c.sockParseQueue <- con
-       }
-
-       // add to the context
-       c.saveActiveConnection(con)
-
-       if log.Enable(logrus.DebugLevel) {
-               marshal, _ := json.Marshal(event)
-               log.Debugf("found connect event: role: %s, %s:%d:%d -> %s:%d, 
json: %s", con.Role.String(),
-                       con.LocalIP, con.LocalPort, con.LocalPid, con.RemoteIP, 
con.RemotePort, string(marshal))
-       }
-}
-
-func (c *Context) saveActiveConnection(con *ConnectionContext) {
-       c.activeConnections.Set(c.generateConnectionKey(con.ConnectionID, 
con.RandomID), con)
-}
-
-type SocketCloseEvent struct {
-       ConID    uint64
-       RandomID uint64
-       ExeTime  uint64
-       Pid      uint32
-       SocketFD uint32
-       Role     ConnectionRole
-       Protocol ConnectionProtocol
-       IsSSL    uint32
-       Fix      uint32
-
-       SocketFamily   uint32
-       RemoteAddrV4   uint32
-       RemoteAddrV6   [16]uint8
-       RemoteAddrPort uint32
-       LocalAddrV4    uint32
-       LocalAddrV6    [16]uint8
-       LocalAddrPort  uint32
-       Fix1           uint32
-
-       WriteBytes   uint64
-       WriteCount   uint64
-       WriteExeTime uint64
-       ReadBytes    uint64
-       ReadCount    uint64
-       ReadExeTime  uint64
-
-       WriteRTTCount   uint64
-       WriteRTTExeTime uint64
-}
-
-// batch to re-process all cached closed event
-func (c *Context) batchReProcessCachedCloseEvent() {
-       for len(c.flushClosedEvents) > 0 {
-               event := <-c.flushClosedEvents
-               if !c.socketClosedEvent0(event) {
-                       // if cannot the found the active connection, then just 
create a new closed connection context
-                       processes := c.processes[int32(event.Pid)]
-                       if len(processes) == 0 {
-                               continue
-                       }
-                       cc := c.newConnectionContext(event.ConID, 
event.RandomID, event.Pid, event.SocketFD, processes, true)
-                       if event.SocketFamily == unix.AF_INET {
-                               cc.RemoteIP = parseAddressV4(event.RemoteAddrV4)
-                               cc.LocalIP = parseAddressV4(event.LocalAddrV4)
-                       } else if event.SocketFamily == unix.AF_INET6 {
-                               cc.RemoteIP = parseAddressV6(event.RemoteAddrV6)
-                               cc.LocalIP = parseAddressV6(event.LocalAddrV6)
-                       } else {
-                               continue
-                       }
-
-                       // append to the closed connection
-                       c.closedConnections = append(c.closedConnections, 
c.combineClosedConnection(cc, event))
-               }
-       }
-}
-
-func (c *Context) newConnectionContext(conID, randomID uint64, pid, fd uint32, 
processes []api.ProcessInterface, conClosed bool) *ConnectionContext {
-       return &ConnectionContext{
-               // metadata
-               ConnectionID:     conID,
-               RandomID:         randomID,
-               LocalPid:         pid,
-               SocketFD:         fd,
-               LocalProcesses:   processes,
-               ConnectionClosed: conClosed,
-
-               // metrics
-               WriteCounter:          NewSocketDataCounterWithHistory(),
-               ReadCounter:           NewSocketDataCounterWithHistory(),
-               WriteRTTCounter:       NewSocketDataCounterWithHistory(),
-               WriteRTTHistogram:     
NewSocketDataHistogramWithHistory(HistogramDataUnitUS),
-               WriteExeTimeHistogram: 
NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
-               ReadExeTimeHistogram:  
NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
-       }
-}
-
-func (c *Context) handleSocketCloseEvent(data interface{}) {
-       event := data.(*SocketCloseEvent)
-
-       if log.Enable(logrus.DebugLevel) {
-               marshal, _ := json.Marshal(event)
-               log.Debugf("found close event: %s", string(marshal))
-       }
-
-       // try to handle the socket close event
-       if !c.socketClosedEvent0(event) {
-               // is not in active connection, maybe it's not have been added 
to activate first
-               // just add to the close queue, wait for the flush connection 
with interval
-               c.flushClosedEvents <- event
-               return
-       }
-}
-
-// SocketExceptionOperationEvent Socket have been retransmitted/drop the 
package event
-type SocketExceptionOperationEvent struct {
-       Pid            uint32
-       SocketFamily   uint32
-       RemoteAddrV4   uint32
-       RemoteAddrV6   [16]uint8
-       RemoteAddrPort uint32
-       Type           SocketExceptionOperationType
-}
-
-func (c *Context) handleSocketExceptionOperationEvent(data interface{}) {
-       event := data.(*SocketExceptionOperationEvent)
-       c.socketExceptionOperationLock.Lock()
-       defer c.socketExceptionOperationLock.Unlock()
-
-       key := SocketBasicKey{
-               Pid:          event.Pid,
-               Family:       event.SocketFamily,
-               RemoteAddrV4: event.RemoteAddrV4,
-               RemoteAddrV6: event.RemoteAddrV6,
-               RemotePort:   event.RemoteAddrPort,
-       }
-       exceptionValue := c.socketExceptionStatics[key]
-       if exceptionValue == nil {
-               exceptionValue = &SocketExceptionValue{}
-               c.socketExceptionStatics[key] = exceptionValue
-       }
-
-       switch event.Type {
-       case SocketExceptionOperationRetransmit:
-               exceptionValue.RetransmitCount++
-       case SocketExceptionOperationDrop:
-               exceptionValue.DropCount++
-       default:
-               log.Warnf("unknown socket exception operation type: %d", 
event.Type)
-       }
-
-       if log.Enable(logrus.DebugLevel) {
-               marshal, _ := json.Marshal(event)
-               log.Debugf("found socket exception operation event: %s", 
string(marshal))
-       }
-}
-
-func (c *Context) socketClosedEvent0(event *SocketCloseEvent) bool {
-       activeCon := c.foundAndDeleteConnection(event)
-       if activeCon == nil {
-               return false
-       }
-
-       // combine the connection data
-       c.closedConnections = append(c.closedConnections, 
c.combineClosedConnection(activeCon, event))
-       return true
-}
-
-func (c *Context) foundAndDeleteConnection(event *SocketCloseEvent) 
*ConnectionContext {
-       conKey := c.generateConnectionKey(event.ConID, event.RandomID)
-       val, exists := c.activeConnections.Pop(conKey)
-       if !exists {
-               return nil
-       }
-       return val.(*ConnectionContext)
-}
-
-func (c *Context) deleteConnectionOnly(ccs []string) {
-       for _, cc := range ccs {
-               c.activeConnections.Remove(cc)
-       }
-}
-
-func (c *Context) combineClosedConnection(active *ConnectionContext, closed 
*SocketCloseEvent) *ConnectionContext {
-       active.ConnectionClosed = true
-
-       if active.Role == ConnectionRoleUnknown && closed.Role != 
ConnectionRoleUnknown {
-               active.Role = closed.Role
-       }
-       if active.Protocol == ConnectionProtocolUnknown && closed.Protocol != 
ConnectionProtocolUnknown {
-               active.Protocol = closed.Protocol
-       }
-       if !active.IsSSL && closed.IsSSL == 1 {
-               active.IsSSL = true
-       }
-
-       active.WriteCounter.UpdateToCurrent(closed.WriteBytes, 
closed.WriteCount, closed.WriteExeTime)
-       active.ReadCounter.UpdateToCurrent(closed.ReadBytes, closed.ReadCount, 
closed.ReadExeTime)
-       active.WriteRTTCounter.UpdateToCurrent(0, closed.WriteRTTCount, 
closed.WriteRTTExeTime)
-       active.CloseExecuteTime = closed.ExeTime
-       return active
-}
-
-func (c *Context) generateConnectionKey(conID, randomID uint64) string {
-       return fmt.Sprintf("%d_%d", conID, randomID)
-}
-
-func (c *Context) AddProcesses(processes []api.ProcessInterface) error {
-       var err error
-       for _, p := range processes {
-               pid := p.Pid()
-               alreadyExists := false
-               if len(c.processes[pid]) > 0 {
-                       for _, existsProcess := range c.processes[pid] {
-                               if p.ID() == existsProcess.ID() {
-                                       alreadyExists = true
-                                       break
-                               }
-                       }
-               }
-
-               if alreadyExists {
-                       continue
-               }
-
-               c.processes[pid] = append(c.processes[pid], p)
-
-               // add to the process let it could be monitored
-               if err1 := c.bpf.ProcessMonitorControl.Update(uint32(pid), 
uint32(1), ebpf.UpdateAny); err1 != nil {
-                       err = multierror.Append(err, err1)
-               }
-
-               // add process ssl config
-               if err1 := addSSLProcess(int(pid), c.bpf, c.linker); err1 != 
nil {
-                       err = multierror.Append(err, err1)
-               }
-
-               log.Debugf("add monitor process, pid: %d", pid)
-       }
-       return err
-}
-
-func (c *Context) DeleteProcesses(processes []api.ProcessInterface) 
(emptyProcesses bool, deleteError error) {
-       var err error
-       for _, p := range processes {
-               pid := p.Pid()
-               existsProcesses := make([]api.ProcessInterface, 0)
-               existsProcesses = append(existsProcesses, c.processes[pid]...)
-
-               // update process entities
-               newProcesses := make([]api.ProcessInterface, 0)
-
-               for _, existProcess := range existsProcesses {
-                       if p.ID() != existProcess.ID() {
-                               newProcesses = append(newProcesses, 
existProcess)
-                       }
-               }
-
-               // no process need delete, then just ignore
-               if len(newProcesses) == len(existsProcesses) {
-                       continue
-               }
-
-               // the process no need to monitor, then just ignore
-               if len(newProcesses) == 0 {
-                       if err1 := 
c.bpf.ProcessMonitorControl.Delete(uint32(pid)); err1 != nil {
-                               err = multierror.Append(err, err1)
-                       }
-                       log.Debugf("delete monitor process: %d", pid)
-                       delete(c.processes, pid)
-                       continue
-               }
-               c.processes[pid] = newProcesses
-       }
-       return len(c.processes) == 0, err
-}
-
-func parseAddressV4(val uint32) string {
-       return net.IP((*(*[net.IPv4len]byte)(unsafe.Pointer(&val)))[:]).String()
-}
-
-func parseAddressV6(val [16]uint8) string {
-       return net.IP((*(*[net.IPv6len]byte)(unsafe.Pointer(&val)))[:]).String()
-}
diff --git a/pkg/profiling/task/network/metrics.go 
b/pkg/profiling/task/network/metrics.go
deleted file mode 100644
index a52d5e3..0000000
--- a/pkg/profiling/task/network/metrics.go
+++ /dev/null
@@ -1,396 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package network
-
-import (
-       "fmt"
-       "time"
-
-       "github.com/apache/skywalking-rover/pkg/process/api"
-       "github.com/apache/skywalking-rover/pkg/tools"
-
-       v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
-)
-
-type SocketDataCounter struct {
-       Bytes   uint64
-       Count   uint64
-       ExeTime uint64
-}
-
-func NewSocketDataCounter() *SocketDataCounter {
-       return &SocketDataCounter{}
-}
-
-func (s *SocketDataCounter) Increase(d *SocketDataCounter) {
-       s.IncreaseByValue(d.Bytes, d.Count, d.ExeTime)
-}
-
-func (s *SocketDataCounter) IncreaseByValue(bytes, count, exeTime uint64) {
-       s.Bytes += bytes
-       s.Count += count
-       s.ExeTime += exeTime
-}
-
-func (s *SocketDataCounter) NotEmpty() bool {
-       return s.Count > 0
-}
-
-// SocketDataCounterWithHistory means the socket send/receive data metrics
-type SocketDataCounterWithHistory struct {
-       Pre *SocketDataCounter
-       Cur *SocketDataCounter
-}
-
-func NewSocketDataCounterWithHistory() *SocketDataCounterWithHistory {
-       return &SocketDataCounterWithHistory{
-               Pre: NewSocketDataCounter(),
-               Cur: NewSocketDataCounter(),
-       }
-}
-
-func (c *SocketDataCounterWithHistory) UpdateToCurrent(bytes, count, exeTime 
uint64) {
-       c.Pre = c.Cur
-       c.Cur = &SocketDataCounter{
-               Bytes:   bytes,
-               Count:   count,
-               ExeTime: exeTime,
-       }
-}
-
-func (c *SocketDataCounterWithHistory) CalculateIncrease() *SocketDataCounter {
-       return &SocketDataCounter{
-               Bytes:   subtractionValue(c.Cur.Bytes, c.Pre.Bytes),
-               Count:   subtractionValue(c.Cur.Count, c.Pre.Count),
-               ExeTime: subtractionValue(c.Cur.ExeTime, c.Pre.ExeTime),
-       }
-}
-
-// SocketHistogramBucketsNs means the histogram bucket: 0ms, 0.01ms, 0.05ms, 
0.1ms, 0.5ms, 1ms, 1.2ms, 1.5ms, 1.7ms, 2ms,
-// 2.5ms, 3ms, 5ms, 7ms, 10ms, 13ms, 16ms, 20ms, 25ms, 30ms, 35ms, 40ms, 45ms, 
50ms, 70ms, 100ms, 150ms,
-// 200ms, 300ms, 500ms, 1s, 2s, 3s, 5s
-// value unit: ns
-var SocketHistogramBucketsNs = []float64{0, 10000, 50000, 100000, 500000, 
1000000, 1200000, 1500000, 1700000, 2000000,
-       2500000, 3000000, 5000000, 7000000, 10000000, 13000000, 16000000, 
20000000, 25000000, 30000000, 35000000, 40000000,
-       45000000, 50000000, 70000000, 100000000, 150000000, 200000000, 
300000000, 500000000, 1000000000, 2000000000,
-       3000000000, 5000000000}
-
-// SocketHistogramBucketsUs same with SocketHistogramBucketsNs, but the value 
unit: us
-var SocketHistogramBucketsUs = []float64{0, 10, 50, 100, 500, 1000, 1200, 
1500, 1700, 2000,
-       2500, 3000, 5000, 7000, 10000, 13000, 16000, 20000, 25000, 30000, 
35000, 40000,
-       45000, 50000, 70000, 100000, 150000, 200000, 300000, 500000, 1000000, 
2000000,
-       3000000, 5000000}
-var SocketHistogramBucketsCount = len(SocketHistogramBucketsNs)
-
-type SocketDataHistogram struct {
-       Unit    HistogramDataUnit
-       Buckets map[uint64]uint32
-}
-
-func (h *SocketDataHistogram) Overwrite(other *SocketDataHistogram) {
-       for k, v := range other.Buckets {
-               h.Buckets[k] = v
-       }
-}
-
-func (h *SocketDataHistogram) Update(bucket uint64, value uint32) {
-       h.Buckets[bucket] = value
-}
-
-func (h *SocketDataHistogram) Increase(other *SocketDataHistogram) {
-       for k, v := range other.Buckets {
-               h.Buckets[k] += v
-       }
-}
-
-func (h *SocketDataHistogram) IncreaseByValue(val uint64) {
-       floatVal := float64(val)
-       for inx, curVal := range SocketHistogramBucketsNs {
-               if inx > 0 && curVal > floatVal {
-                       h.Buckets[uint64(inx-1)]++
-                       return
-               }
-       }
-       h.Buckets[uint64(len(SocketHistogramBucketsNs)-1)]++
-}
-
-func (h *SocketDataHistogram) NotEmpty() bool {
-       for _, v := range h.Buckets {
-               if v > 0 {
-                       return true
-               }
-       }
-       return false
-}
-
-func NewSocketDataHistogram(unit HistogramDataUnit) *SocketDataHistogram {
-       buckets := make(map[uint64]uint32, SocketHistogramBucketsCount)
-       for i := 0; i < SocketHistogramBucketsCount; i++ {
-               buckets[uint64(i)] = 0
-       }
-       return &SocketDataHistogram{
-               Unit:    unit,
-               Buckets: buckets,
-       }
-}
-
-type HistogramDataUnit int
-
-const (
-       HistogramDataUnitNS HistogramDataUnit = 1
-       HistogramDataUnitUS HistogramDataUnit = 2
-)
-
-type SocketDataHistogramWithHistory struct {
-       Pre *SocketDataHistogram
-       Cur *SocketDataHistogram
-}
-
-func NewSocketDataHistogramWithHistory(unit HistogramDataUnit) 
*SocketDataHistogramWithHistory {
-       return &SocketDataHistogramWithHistory{
-               Pre: NewSocketDataHistogram(unit),
-               Cur: NewSocketDataHistogram(unit),
-       }
-}
-
-func (h *SocketDataHistogramWithHistory) RefreshCurrent() {
-       // storage the current value to the previous buckets
-       h.Pre.Overwrite(h.Cur)
-}
-
-func (h *SocketDataHistogramWithHistory) UpdateToCurrent(bucket uint64, val 
uint32) {
-       h.Cur.Update(bucket, val)
-}
-
-func (h *SocketDataHistogramWithHistory) CalculateIncrease() 
*SocketDataHistogram {
-       histogram := NewSocketDataHistogram(h.Cur.Unit)
-       var increaseVal uint32
-       for curK, curV := range h.Cur.Buckets {
-               if increaseVal = curV - h.Pre.Buckets[curK]; increaseVal > 0 {
-                       histogram.Buckets[curK] = increaseVal
-               }
-       }
-       return histogram
-}
-
-func subtractionValue(v1, v2 uint64) uint64 {
-       if v1 > v2 {
-               return v1 - v2
-       }
-       return 0
-}
-
-type ProcessTraffic struct {
-       analyzer *TrafficAnalyzer
-
-       // local process information
-       LocalPid       uint32
-       LocalProcesses []api.ProcessInterface
-       LocalIP        string
-       LocalPort      uint16
-
-       // current connection role of local process
-       ConnectionRole ConnectionRole
-       // the protocol of the connection
-       Protocol ConnectionProtocol
-       // current connection is SSL
-       IsSSL bool
-
-       // remote process/address information
-       RemoteIP        string
-       RemotePort      uint16
-       RemotePid       uint32
-       RemoteProcesses []api.ProcessInterface
-
-       // statics
-       WriteCounter *SocketDataCounter
-       ReadCounter  *SocketDataCounter
-       // write RTT
-       WriteRTTCounter *SocketDataCounter
-
-       // connection operate
-       ConnectCounter *SocketDataCounter
-       CloseCounter   *SocketDataCounter
-
-       // exception operate
-       RetransmitCounter *SocketDataCounter
-       DropCounter       *SocketDataCounter
-
-       // histograms
-       // write execute time and RTT
-       WriteRTTHistogram     *SocketDataHistogram
-       WriteExeTimeHistogram *SocketDataHistogram
-       // read execute time
-       ReadExeTimeHistogram *SocketDataHistogram
-
-       // connection operate
-       ConnectExeTimeHistogram *SocketDataHistogram
-       CloseExeTimeHistogram   *SocketDataHistogram
-}
-
-func (r *ProcessTraffic) ContainsAnyTraffic() bool {
-       return r.WriteCounter.NotEmpty() || r.ReadCounter.NotEmpty() || 
r.WriteRTTCounter.NotEmpty() || r.ConnectCounter.NotEmpty() ||
-               r.CloseCounter.NotEmpty() || r.WriteRTTHistogram.NotEmpty() || 
r.WriteExeTimeHistogram.NotEmpty() || r.ReadExeTimeHistogram.NotEmpty() ||
-               r.ConnectExeTimeHistogram.NotEmpty() || 
r.CloseExeTimeHistogram.NotEmpty()
-}
-
-func (r *ProcessTraffic) GenerateMetrics(metricsPrefix string) 
[]*v3.MeterDataCollection {
-       result := make([]*v3.MeterDataCollection, 0)
-       for _, p := range r.LocalProcesses {
-               collection := make([]*v3.MeterData, 0)
-               collection = r.appendCounterValues(collection, metricsPrefix, 
"write", p, r.WriteCounter)
-               collection = r.appendCounterValues(collection, metricsPrefix, 
"read", p, r.ReadCounter)
-               collection = r.appendCounterValues(collection, metricsPrefix, 
"write_rtt", p, r.WriteRTTCounter)
-               collection = r.appendCounterValues(collection, metricsPrefix, 
"connect", p, r.ConnectCounter)
-               collection = r.appendCounterValues(collection, metricsPrefix, 
"close", p, r.CloseCounter)
-               collection = r.appendCounterValues(collection, metricsPrefix, 
"retransmit", p, r.RetransmitCounter)
-               collection = r.appendCounterValues(collection, metricsPrefix, 
"drop", p, r.DropCounter)
-
-               collection = r.appendHistogramValue(collection, metricsPrefix, 
"write_rtt", p, r.WriteRTTHistogram)
-               collection = r.appendHistogramValue(collection, metricsPrefix, 
"write_exe_time", p, r.WriteExeTimeHistogram)
-               collection = r.appendHistogramValue(collection, metricsPrefix, 
"read_exe_time", p, r.ReadExeTimeHistogram)
-               collection = r.appendHistogramValue(collection, metricsPrefix, 
"connect_exe_time", p, r.ConnectExeTimeHistogram)
-               collection = r.appendHistogramValue(collection, metricsPrefix, 
"close_exe_time", p, r.CloseExeTimeHistogram)
-
-               if len(collection) == 0 {
-                       continue
-               }
-
-               // add entity
-               collection[0].Service = p.Entity().ServiceName
-               collection[0].ServiceInstance = p.Entity().InstanceName
-               collection[0].Timestamp = time.Now().UnixMilli()
-               result = append(result, &v3.MeterDataCollection{
-                       MeterData: collection,
-               })
-       }
-
-       return result
-}
-
-func (r *ProcessTraffic) appendCounterValues(metrics []*v3.MeterData, 
metricsPrefix, name string, local api.ProcessInterface,
-       counter *SocketDataCounter) []*v3.MeterData {
-       if !counter.NotEmpty() {
-               return metrics
-       }
-
-       count := float64(counter.Count)
-       metrics = append(metrics, r.buildSingleValue(metricsPrefix, 
name+"_counts_counter", local, count))
-       if counter.Bytes > 0 {
-               metrics = append(metrics, r.buildSingleValue(metricsPrefix, 
name+"_bytes_counter", local, float64(counter.Bytes)))
-       }
-       if counter.ExeTime > 0 {
-               metrics = append(metrics, r.buildSingleValue(metricsPrefix, 
name+"_exe_time_counter", local, float64(counter.ExeTime)/count))
-       }
-       return metrics
-}
-
-func (r *ProcessTraffic) appendHistogramValue(metrics []*v3.MeterData, 
metricsPrefix, name string,
-       local api.ProcessInterface, histogram *SocketDataHistogram) 
[]*v3.MeterData {
-       if !histogram.NotEmpty() {
-               return metrics
-       }
-
-       role, labels := r.buildBasicMeterLabels(local)
-       values := make([]*v3.MeterBucketValue, 0)
-       for bucket, count := range histogram.Buckets {
-               var bucketInx = int(bucket)
-               if bucketInx >= SocketHistogramBucketsCount {
-                       bucketInx = SocketHistogramBucketsCount - 1
-               }
-               var buckets []float64
-               if histogram.Unit == HistogramDataUnitUS {
-                       buckets = SocketHistogramBucketsUs
-               } else {
-                       buckets = SocketHistogramBucketsNs
-               }
-               values = append(values, &v3.MeterBucketValue{
-                       Bucket: buckets[bucketInx],
-                       Count:  int64(count),
-               })
-       }
-
-       return append(metrics, &v3.MeterData{
-               Metric: &v3.MeterData_Histogram{
-                       Histogram: &v3.MeterHistogram{
-                               Name:   fmt.Sprintf("%s%s_%s_histogram", 
metricsPrefix, role.String(), name),
-                               Labels: labels,
-                               Values: values,
-                       },
-               },
-       })
-}
-
-func (r *ProcessTraffic) buildSingleValue(prefix, name string, local 
api.ProcessInterface, val float64) *v3.MeterData {
-       role, labels := r.buildBasicMeterLabels(local)
-
-       return &v3.MeterData{
-               Metric: &v3.MeterData_SingleValue{
-                       SingleValue: &v3.MeterSingleValue{
-                               Name:   fmt.Sprintf("%s%s_%s", prefix, 
role.String(), name),
-                               Labels: labels,
-                               Value:  val,
-                       },
-               },
-       }
-}
-
-func (r *ProcessTraffic) buildBasicMeterLabels(local api.ProcessInterface) 
(ConnectionRole, []*v3.Label) {
-       curRole := r.ConnectionRole
-       // add the default role
-       if curRole == ConnectionRoleUnknown {
-               curRole = ConnectionRoleClient
-       }
-       labels := make([]*v3.Label, 0)
-
-       // two pair process/address info
-       labels = r.appendMeterValue(labels, fmt.Sprintf("%s_process_id", 
curRole.String()), local.ID())
-       labels = r.appendRemoteAddrssInfo(labels, curRole.Revert().String(), 
local)
-
-       labels = r.appendMeterValue(labels, "side", curRole.String())
-
-       // protocol and ssl
-       labels = r.appendMeterValue(labels, "protocol", r.Protocol.String())
-       labels = r.appendMeterValue(labels, "is_ssl", fmt.Sprintf("%t", 
r.IsSSL))
-       return curRole, labels
-}
-
-func (r *ProcessTraffic) appendRemoteAddrssInfo(labels []*v3.Label, prefix 
string, local api.ProcessInterface) []*v3.Label {
-       if len(r.RemoteProcesses) != 0 {
-               for _, p := range r.RemoteProcesses {
-                       // only match with same service instance
-                       if local.Entity().ServiceName == p.Entity().ServiceName 
&&
-                               local.Entity().InstanceName == 
p.Entity().InstanceName {
-                               return r.appendMeterValue(labels, 
prefix+"_process_id", p.ID())
-                       }
-               }
-       }
-
-       if tools.IsLocalHostAddress(r.RemoteIP) || 
r.analyzer.IsLocalAddressInCache(r.RemoteIP) {
-               return r.appendMeterValue(labels, prefix+"_local", "true")
-       }
-
-       return r.appendMeterValue(labels, prefix+"_address", 
fmt.Sprintf("%s:%d", r.RemoteIP, r.RemotePort))
-}
-
-func (r *ProcessTraffic) appendMeterValue(labels []*v3.Label, name, value 
string) []*v3.Label {
-       return append(labels, &v3.Label{
-               Name:  name,
-               Value: value,
-       })
-}
diff --git a/pkg/profiling/task/network/runner.go 
b/pkg/profiling/task/network/runner.go
index c623bfd..b52aa3c 100644
--- a/pkg/profiling/task/network/runner.go
+++ b/pkg/profiling/task/network/runner.go
@@ -24,7 +24,7 @@ import (
        "sync"
        "time"
 
-       "github.com/sirupsen/logrus"
+       "github.com/cilium/ebpf"
 
        "github.com/hashicorp/go-multierror"
 
@@ -35,16 +35,14 @@ import (
        "github.com/apache/skywalking-rover/pkg/module"
        "github.com/apache/skywalking-rover/pkg/process/api"
        "github.com/apache/skywalking-rover/pkg/profiling/task/base"
-       "github.com/apache/skywalking-rover/pkg/tools/btf"
+       "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze"
+       analyzeBase 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+       "github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
 
        v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
 )
 
-// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
-// nolint
-//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target bpfel -cc 
$BPF_CLANG -cflags $BPF_CFLAGS bpf 
$REPO_ROOT/bpf/profiling/network/netmonitor.c -- -I$REPO_ROOT/bpf/include 
-D__TARGET_ARCH_x86
-
-var log = logger.GetLogger("profiling", "task", "network", "topology")
+var log = logger.GetLogger("profiling", "task", "network")
 
 type Runner struct {
        initOnce       sync.Once
@@ -54,18 +52,19 @@ type Runner struct {
        reportInterval time.Duration
        meterPrefix    string
 
-       bpf        *bpfObjects
-       linker     *Linker
-       bpfContext *Context
+       bpf            *bpf.Loader
+       processes      map[int32][]api.ProcessInterface
+       analyzeContext *analyzeBase.AnalyzerContext
 
        ctx    context.Context
        cancel context.CancelFunc
 }
 
 func NewGlobalRunnerContext() *Runner {
+       processes := make(map[int32][]api.ProcessInterface)
        return &Runner{
-               bpfContext: NewContext(),
-               linker:     NewLinker(),
+               processes:      processes,
+               analyzeContext: analyze.NewContext(processes),
        }
 }
 
@@ -78,7 +77,38 @@ func (r *Runner) init(config *base.TaskConfig, moduleMgr 
*module.Manager) error
 }
 
 func (r *Runner) DeleteProcesses(processes []api.ProcessInterface) (bool, 
error) {
-       return r.bpfContext.DeleteProcesses(processes)
+       var err error
+       for _, p := range processes {
+               pid := p.Pid()
+               existsProcesses := make([]api.ProcessInterface, 0)
+               existsProcesses = append(existsProcesses, r.processes[pid]...)
+
+               // update process entities
+               newProcesses := make([]api.ProcessInterface, 0)
+
+               for _, existProcess := range existsProcesses {
+                       if p.ID() != existProcess.ID() {
+                               newProcesses = append(newProcesses, 
existProcess)
+                       }
+               }
+
+               // no process need delete, then just ignore
+               if len(newProcesses) == len(existsProcesses) {
+                       continue
+               }
+
+               // the process no need to monitor, then just ignore
+               if len(newProcesses) == 0 {
+                       if err1 := 
r.bpf.ProcessMonitorControl.Delete(uint32(pid)); err1 != nil {
+                               err = multierror.Append(err, err1)
+                       }
+                       log.Debugf("delete monitor process: %d", pid)
+                       delete(r.processes, pid)
+                       continue
+               }
+               r.processes[pid] = newProcesses
+       }
+       return len(r.processes) == 0, err
 }
 
 func (r *Runner) Start(ctx context.Context, processes []api.ProcessInterface) 
error {
@@ -86,59 +116,58 @@ func (r *Runner) Start(ctx context.Context, processes 
[]api.ProcessInterface) er
        defer r.startLock.Unlock()
        // if already start, then just adding the processes
        if r.bpf != nil {
-               return r.bpfContext.AddProcesses(processes)
+               return r.addProcesses(processes)
        }
 
        r.ctx, r.cancel = context.WithCancel(ctx)
        // load bpf program
-       objs := bpfObjects{}
-       if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); 
err != nil {
+       bpfLoader, err := bpf.NewLoader()
+       if err != nil {
                return err
        }
-       r.bpf = &objs
-       r.bpfContext.Init(&objs, r.linker)
+       r.bpf = bpfLoader
 
-       if err := r.bpfContext.AddProcesses(processes); err != nil {
+       if err := r.addProcesses(processes); err != nil {
                return err
        }
 
        // register all handlers
-       r.bpfContext.RegisterAllHandlers()
-       r.bpfContext.StartSocketAddressParser(r.ctx)
+       r.analyzeContext.RegisterAllHandlers(bpfLoader)
+       r.analyzeContext.StartSocketAddressParser(r.ctx)
 
        // sock opts
-       r.linker.AddSysCall("close", objs.SysClose, objs.SysCloseRet)
-       r.linker.AddSysCall("connect", objs.SysConnect, objs.SysConnectRet)
-       r.linker.AddSysCall("accept", objs.SysAccept, objs.SysAcceptRet)
-       r.linker.AddSysCall("accept4", objs.SysAccept, objs.SysAcceptRet)
-       r.linker.AddLink(link.Kretprobe, objs.SockAllocRet, "sock_alloc")
-       r.linker.AddLink(link.Kprobe, objs.TcpConnect, "tcp_connect")
+       bpfLoader.AddSysCall("close", bpfLoader.SysClose, bpfLoader.SysCloseRet)
+       bpfLoader.AddSysCall("connect", bpfLoader.SysConnect, 
bpfLoader.SysConnectRet)
+       bpfLoader.AddSysCall("accept", bpfLoader.SysAccept, 
bpfLoader.SysAcceptRet)
+       bpfLoader.AddSysCall("accept4", bpfLoader.SysAccept, 
bpfLoader.SysAcceptRet)
+       bpfLoader.AddLink(link.Kretprobe, bpfLoader.SockAllocRet, "sock_alloc")
+       bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpConnect, "tcp_connect")
 
        // write/receive data
-       r.linker.AddSysCall("send", objs.SysSend, objs.SysSendRet)
-       r.linker.AddSysCall("sendto", objs.SysSendto, objs.SysSendtoRet)
-       r.linker.AddSysCall("sendmsg", objs.SysSendmsg, objs.SysSendmsgRet)
-       r.linker.AddSysCall("sendmmsg", objs.SysSendmmsg, objs.SysSendmmsgRet)
-       r.linker.AddSysCall("sendfile", objs.SysSendfile, objs.SysSendfileRet)
-       r.linker.AddSysCall("sendfile64", objs.SysSendfile, objs.SysSendfileRet)
-       r.linker.AddSysCall("write", objs.SysWrite, objs.SysWriteRet)
-       r.linker.AddSysCall("writev", objs.SysWritev, objs.SysWritevRet)
-       r.linker.AddSysCall("read", objs.SysRead, objs.SysReadRet)
-       r.linker.AddSysCall("readv", objs.SysReadv, objs.SysReadvRet)
-       r.linker.AddSysCall("recv", objs.SysRecv, objs.SysRecvRet)
-       r.linker.AddSysCall("recvfrom", objs.SysRecvfrom, objs.SysRecvfromRet)
-       r.linker.AddSysCall("recvmsg", objs.SysRecvmsg, objs.SysRecvmsgRet)
-       r.linker.AddSysCall("recvmmsg", objs.SysRecvmmsg, objs.SysRecvmmsgRet)
-       r.linker.AddLink(link.Kprobe, objs.TcpRcvEstablished, 
"tcp_rcv_established")
-       r.linker.AddLink(link.Kprobe, objs.SecuritySocketSendmsg, 
"security_socket_sendmsg")
-       r.linker.AddLink(link.Kprobe, objs.SecuritySocketRecvmsg, 
"security_socket_recvmsg")
+       bpfLoader.AddSysCall("send", bpfLoader.SysSend, bpfLoader.SysSendRet)
+       bpfLoader.AddSysCall("sendto", bpfLoader.SysSendto, 
bpfLoader.SysSendtoRet)
+       bpfLoader.AddSysCall("sendmsg", bpfLoader.SysSendmsg, 
bpfLoader.SysSendmsgRet)
+       bpfLoader.AddSysCall("sendmmsg", bpfLoader.SysSendmmsg, 
bpfLoader.SysSendmmsgRet)
+       bpfLoader.AddSysCall("sendfile", bpfLoader.SysSendfile, 
bpfLoader.SysSendfileRet)
+       bpfLoader.AddSysCall("sendfile64", bpfLoader.SysSendfile, 
bpfLoader.SysSendfileRet)
+       bpfLoader.AddSysCall("write", bpfLoader.SysWrite, bpfLoader.SysWriteRet)
+       bpfLoader.AddSysCall("writev", bpfLoader.SysWritev, 
bpfLoader.SysWritevRet)
+       bpfLoader.AddSysCall("read", bpfLoader.SysRead, bpfLoader.SysReadRet)
+       bpfLoader.AddSysCall("readv", bpfLoader.SysReadv, bpfLoader.SysReadvRet)
+       bpfLoader.AddSysCall("recv", bpfLoader.SysRecv, bpfLoader.SysRecvRet)
+       bpfLoader.AddSysCall("recvfrom", bpfLoader.SysRecvfrom, 
bpfLoader.SysRecvfromRet)
+       bpfLoader.AddSysCall("recvmsg", bpfLoader.SysRecvmsg, 
bpfLoader.SysRecvmsgRet)
+       bpfLoader.AddSysCall("recvmmsg", bpfLoader.SysRecvmmsg, 
bpfLoader.SysRecvmmsgRet)
+       bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpRcvEstablished, 
"tcp_rcv_established")
+       bpfLoader.AddLink(link.Kprobe, bpfLoader.SecuritySocketSendmsg, 
"security_socket_sendmsg")
+       bpfLoader.AddLink(link.Kprobe, bpfLoader.SecuritySocketRecvmsg, 
"security_socket_recvmsg")
 
        // retransmit/drop
-       r.linker.AddLink(link.Kprobe, objs.TcpRetransmit, "tcp_retransmit_skb")
-       r.linker.AddLink(link.Kprobe, objs.TcpDrop, "tcp_drop")
+       bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpRetransmit, 
"tcp_retransmit_skb")
+       bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpDrop, "tcp_drop")
 
-       if err := r.linker.HasError(); err != nil {
-               _ = r.linker.Close()
+       if err := bpfLoader.HasError(); err != nil {
+               _ = bpfLoader.Close()
                return err
        }
 
@@ -165,31 +194,12 @@ func (r *Runner) registerMetricsReport() {
 }
 
 func (r *Runner) flushMetrics() error {
-       // flush all connection from bpf
-       connections, err := r.bpfContext.FlushAllConnection()
+       // flush all metrics
+       metricsBuilder, err := r.analyzeContext.FlushAllMetrics(r.bpf, 
r.meterPrefix)
        if err != nil {
                return err
        }
-       if len(connections) == 0 {
-               return nil
-       }
-
-       if log.Enable(logrus.DebugLevel) {
-               for _, con := range connections {
-                       log.Debugf("found connection: %d, %s relation: 
%s:%d(%d) -> %s:%d, protocol: %s, is_ssl: %t, read: %d bytes/%d, write: %d 
bytes/%d",
-                               con.ConnectionID, con.Role.String(),
-                               con.LocalIP, con.LocalPort, con.LocalPid, 
con.RemoteIP, con.RemotePort,
-                               con.Protocol.String(), con.IsSSL, 
con.WriteCounter.Cur.Bytes, con.WriteCounter.Cur.Count,
-                               con.ReadCounter.Cur.Bytes, 
con.ReadCounter.Cur.Count)
-               }
-       }
-       // combine all connection
-       analyzer := NewTrafficAnalyzer(r.bpfContext.processes)
-       traffics := analyzer.CombineConnectionToTraffics(connections)
-       if len(traffics) == 0 {
-               return nil
-       }
-       r.logTheMetricsConnections(traffics)
+       metrics := metricsBuilder.Build()
 
        // send metrics
        batch, err := r.meterClient.CollectBatch(r.ctx)
@@ -202,13 +212,10 @@ func (r *Runner) flushMetrics() error {
                }
        }()
        count := 0
-       for _, traffic := range traffics {
-               collections := traffic.GenerateMetrics(r.meterPrefix)
-               for _, col := range collections {
-                       count += len(col.MeterData)
-                       if err := batch.Send(col); err != nil {
-                               return err
-                       }
+       for _, m := range metrics {
+               count += len(m.MeterData)
+               if err := batch.Send(m); err != nil {
+                       return err
                }
        }
        if count > 0 {
@@ -217,32 +224,6 @@ func (r *Runner) flushMetrics() error {
        return nil
 }
 
-func (r *Runner) logTheMetricsConnections(traffices []*ProcessTraffic) {
-       if !log.Enable(logrus.DebugLevel) {
-               return
-       }
-       for _, traffic := range traffices {
-               localInfo := fmt.Sprintf("%s:%d(%d)", traffic.LocalIP, 
traffic.LocalPort, traffic.LocalPid)
-               if len(traffic.LocalProcesses) > 0 {
-                       p := traffic.LocalProcesses[0]
-                       localInfo = fmt.Sprintf("(%s)%s:%s:%s(%s:%d)(%d)", 
p.Entity().Layer, p.Entity().ServiceName,
-                               p.Entity().InstanceName, 
p.Entity().ProcessName, traffic.LocalIP, traffic.LocalPort, traffic.LocalPid)
-               }
-
-               remoteInfo := fmt.Sprintf("%s:%d(%d)", traffic.RemoteIP, 
traffic.RemotePort, traffic.RemotePid)
-               if len(traffic.RemoteProcesses) > 0 {
-                       p := traffic.RemoteProcesses[0]
-                       remoteInfo = fmt.Sprintf("(%s)%s:%s:%s(%s:%d)(%d)",
-                               p.Entity().Layer, p.Entity().ServiceName, 
p.Entity().InstanceName, p.Entity().ProcessName,
-                               traffic.RemoteIP, traffic.RemotePort, 
traffic.RemotePid)
-               }
-               side := traffic.ConnectionRole.String()
-               log.Debugf("connection analyze result: %s : %s -> %s, protocol: 
%s, is SSL: %t, write: %d bytes/%d, read: %d bytes/%d",
-                       side, localInfo, remoteInfo, traffic.Protocol.String(), 
traffic.IsSSL, traffic.WriteCounter.Bytes, traffic.WriteCounter.Count,
-                       traffic.ReadCounter.Bytes, traffic.ReadCounter.Count)
-       }
-}
-
 func (r *Runner) Stop() error {
        // if starting, then need to wait start finished
        r.startLock.Lock()
@@ -252,7 +233,6 @@ func (r *Runner) Stop() error {
        }
        var result error
        r.stopOnce.Do(func() {
-               result = r.closeWhenExists(result, r.linker)
                result = r.closeWhenExists(result, r.bpf)
        })
        return result
@@ -284,3 +264,38 @@ func (r *Runner) init0(config *base.TaskConfig, moduleMgr 
*module.Manager) error
        r.meterPrefix = config.Network.MeterPrefix + "_"
        return nil
 }
+
+func (r *Runner) addProcesses(processes []api.ProcessInterface) error {
+       var err error
+       for _, p := range processes {
+               pid := p.Pid()
+               alreadyExists := false
+               if len(r.processes[pid]) > 0 {
+                       for _, existsProcess := range r.processes[pid] {
+                               if p.ID() == existsProcess.ID() {
+                                       alreadyExists = true
+                                       break
+                               }
+                       }
+               }
+
+               if alreadyExists {
+                       continue
+               }
+
+               r.processes[pid] = append(r.processes[pid], p)
+
+               // add to the process let it could be monitored
+               if err1 := r.bpf.ProcessMonitorControl.Update(uint32(pid), 
uint32(1), ebpf.UpdateAny); err1 != nil {
+                       err = multierror.Append(err, err1)
+               }
+
+               // add process ssl config
+               if err1 := addSSLProcess(int(pid), r.bpf); err1 != nil {
+                       err = multierror.Append(err, err1)
+               }
+
+               log.Debugf("add monitor process, pid: %d", pid)
+       }
+       return err
+}
diff --git a/pkg/profiling/task/network/ssl.go 
b/pkg/profiling/task/network/ssl.go
index 9adb7c8..c586c3a 100644
--- a/pkg/profiling/task/network/ssl.go
+++ b/pkg/profiling/task/network/ssl.go
@@ -26,6 +26,8 @@ import (
        "strconv"
        "strings"
 
+       
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+       "github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
        "github.com/apache/skywalking-rover/pkg/tools"
        "github.com/apache/skywalking-rover/pkg/tools/elf"
        "github.com/apache/skywalking-rover/pkg/tools/host"
@@ -52,36 +54,36 @@ type OpenSSLFdSymAddrConfigInBPF struct {
        FDOffset       uint32
 }
 
-func addSSLProcess(pid int, bpf *bpfObjects, linker *Linker) error {
+func addSSLProcess(pid int, loader *bpf.Loader) error {
        modules, err := tools.ProcessModules(int32(pid))
        if err != nil {
                return fmt.Errorf("read process modules error: %d, error: %v", 
pid, err)
        }
 
        // openssl process
-       if err1 := processOpenSSLProcess(pid, bpf, linker, modules); err1 != 
nil {
+       if err1 := processOpenSSLProcess(pid, loader, modules); err1 != nil {
                return err1
        }
 
        // envoy with boring ssl
-       if err1 := processEnvoyProcess(pid, bpf, linker, modules); err1 != nil {
+       if err1 := processEnvoyProcess(pid, loader, modules); err1 != nil {
                return err1
        }
 
        // GoTLS
-       if err1 := processGoProcess(pid, bpf, linker, modules); err1 != nil {
+       if err1 := processGoProcess(pid, loader, modules); err1 != nil {
                return err1
        }
 
        // Nodejs
-       if err1 := processNodeProcess(pid, bpf, linker, modules); err1 != nil {
+       if err1 := processNodeProcess(pid, loader, modules); err1 != nil {
                return err1
        }
 
        return nil
 }
 
-func processOpenSSLProcess(pid int, bpf *bpfObjects, linker *Linker, modules 
[]*profiling.Module) error {
+func processOpenSSLProcess(pid int, loader *bpf.Loader, modules 
[]*profiling.Module) error {
        var libcryptoName, libsslName = "libcrypto.so", "libssl.so"
        var libcryptoPath, libsslPath string
        processModules, err := findProcessModules(modules, libcryptoName, 
libsslName)
@@ -107,22 +109,22 @@ func processOpenSSLProcess(pid int, bpf *bpfObjects, 
linker *Linker, modules []*
        if err != nil {
                return err
        }
-       if err := bpf.OpensslFdSymaddrFinder.Put(uint32(pid), conf); err != nil 
{
+       if err := loader.OpensslFdSymaddrFinder.Put(uint32(pid), conf); err != 
nil {
                return err
        }
 
        // attach the linker
-       return processOpenSSLModule(bpf, processModules[libsslName], linker)
+       return processOpenSSLModule(loader, processModules[libsslName])
 }
 
-func processOpenSSLModule(bpf *bpfObjects, libSSLModule *profiling.Module, 
linker *Linker) error {
-       libSSLLinker := linker.OpenUProbeExeFile(libSSLModule.Path)
-       libSSLLinker.AddLink("SSL_write", bpf.OpensslWrite, bpf.OpensslWriteRet)
-       libSSLLinker.AddLink("SSL_read", bpf.OpensslRead, bpf.OpensslReadRet)
-       return linker.HasError()
+func processOpenSSLModule(loader *bpf.Loader, libSSLModule *profiling.Module) 
error {
+       libSSLLinker := loader.OpenUProbeExeFile(libSSLModule.Path)
+       libSSLLinker.AddLink("SSL_write", loader.OpensslWrite, 
loader.OpensslWriteRet)
+       libSSLLinker.AddLink("SSL_read", loader.OpensslRead, 
loader.OpensslReadRet)
+       return loader.HasError()
 }
 
-func processEnvoyProcess(_ int, bpf *bpfObjects, linker *Linker, modules 
[]*profiling.Module) error {
+func processEnvoyProcess(_ int, loader *bpf.Loader, modules 
[]*profiling.Module) error {
        moduleName := "/envoy"
        processModules, err := findProcessModules(modules, moduleName)
        if err != nil {
@@ -149,14 +151,14 @@ func processEnvoyProcess(_ int, bpf *bpfObjects, linker 
*Linker, modules []*prof
        log.Debugf("found current module is envoy, so attach to the SSL read 
and write")
 
        // attach the linker
-       libSSLLinker := linker.OpenUProbeExeFile(envoyModule.Path)
-       libSSLLinker.AddLink("SSL_write", bpf.OpensslWrite, bpf.OpensslWriteRet)
-       libSSLLinker.AddLink("SSL_read", bpf.OpensslRead, bpf.OpensslReadRet)
-       return linker.HasError()
+       libSSLLinker := loader.OpenUProbeExeFile(envoyModule.Path)
+       libSSLLinker.AddLink("SSL_write", loader.OpensslWrite, 
loader.OpensslWriteRet)
+       libSSLLinker.AddLink("SSL_read", loader.OpensslRead, 
loader.OpensslReadRet)
+       return loader.HasError()
 }
 
 type SymbolLocation struct {
-       Type   GoTLSArgsLocationType
+       Type   base.GoTLSArgsLocationType
        Offset uint32
 }
 
@@ -189,7 +191,7 @@ type GoStringInC struct {
        Size uint64
 }
 
-func processGoProcess(pid int, bpf *bpfObjects, linker *Linker, modules 
[]*profiling.Module) error {
+func processGoProcess(pid int, loader *bpf.Loader, modules 
[]*profiling.Module) error {
        // check current process is go program
        buildVersionSymbol := searchSymbol(modules, func(a, b string) bool {
                return a == b
@@ -220,20 +222,20 @@ func processGoProcess(pid int, bpf *bpfObjects, linker 
*Linker, modules []*profi
        }
 
        // setting the locations
-       if err := bpf.GoTlsArgsSymaddrMap.Put(uint32(pid), symbolConfig); err 
!= nil {
+       if err := loader.GoTlsArgsSymaddrMap.Put(uint32(pid), symbolConfig); 
err != nil {
                return fmt.Errorf("setting the Go TLS argument location 
failure, pid: %d, error: %v", pid, err)
        }
 
        // uprobes
-       exeFile := linker.OpenUProbeExeFile(pidExeFile)
-       exeFile.AddLinkWithType("runtime.casgstatus", true, bpf.GoCasgstatus)
-       exeFile.AddGoLink(goTLSWriteSymbol, bpf.GoTlsWrite, bpf.GoTlsWriteRet, 
elfFile)
-       exeFile.AddGoLink(goTLSReadSymbol, bpf.GoTlsRead, bpf.GoTlsReadRet, 
elfFile)
+       exeFile := loader.OpenUProbeExeFile(pidExeFile)
+       exeFile.AddLinkWithType("runtime.casgstatus", true, loader.GoCasgstatus)
+       exeFile.AddGoLink(goTLSWriteSymbol, loader.GoTlsWrite, 
loader.GoTlsWriteRet, elfFile)
+       exeFile.AddGoLink(goTLSReadSymbol, loader.GoTlsRead, 
loader.GoTlsReadRet, elfFile)
 
-       return linker.HasError()
+       return loader.HasError()
 }
 
-func processNodeProcess(pid int, bpf *bpfObjects, linker *Linker, modules 
[]*profiling.Module) error {
+func processNodeProcess(pid int, loader *bpf.Loader, modules 
[]*profiling.Module) error {
        moduleName1, moduleName2, libsslName := "/nodejs", "/node", "libssl.so"
        processModules, err := findProcessModules(modules, moduleName1, 
moduleName2, libsslName)
        if err != nil {
@@ -271,16 +273,16 @@ func processNodeProcess(pid int, bpf *bpfObjects, linker 
*Linker, modules []*pro
                return err
        }
        // setting the locations
-       if err := bpf.NodeTlsSymaddrMap.Put(uint32(pid), config); err != nil {
+       if err := loader.NodeTlsSymaddrMap.Put(uint32(pid), config); err != nil 
{
                return fmt.Errorf("setting the node TLS location failure, pid: 
%d, error: %v", pid, err)
        }
        // register node tls
-       if err := registerNodeTLSProbes(v, bpf, linker, nodeModule, 
libsslModule); err != nil {
+       if err := registerNodeTLSProbes(v, loader, nodeModule, libsslModule); 
err != nil {
                return fmt.Errorf("register node TLS probes failure, pid: %d, 
error: %v", pid, err)
        }
        // attach the OpenSSL Probe if needs
        if needsReAttachSSL {
-               return processOpenSSLModule(bpf, libsslModule, linker)
+               return processOpenSSLModule(loader, libsslModule)
        }
        return nil
 }
@@ -301,9 +303,9 @@ var nodeTLSAddrWithVersions = []struct {
 
 var nodeTLSProbeWithVersions = []struct {
        v *version.Version
-       f func(uprobe *UProbeExeFile, bpf *bpfObjects, nodeModule 
*profiling.Module)
+       f func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, nodeModule 
*profiling.Module)
 }{
-       {version.Build(10, 19, 0), func(uprobe *UProbeExeFile, bpf *bpfObjects, 
nodeModule *profiling.Module) {
+       {version.Build(10, 19, 0), func(uprobe *bpf.UProbeExeFile, bpf 
*bpf.Loader, nodeModule *profiling.Module) {
                
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, 
strings.HasPrefix, "_ZN4node7TLSWrapC2E"),
                        bpf.NodeTlsWrap, bpf.NodeTlsWrapRet)
                
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, 
strings.HasPrefix, "_ZN4node7TLSWrap7ClearInE"),
@@ -311,7 +313,7 @@ var nodeTLSProbeWithVersions = []struct {
                
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, 
strings.HasPrefix, "_ZN4node7TLSWrap8ClearOutE"),
                        bpf.NodeTlsWrap, bpf.NodeTlsWrapRet)
        }},
-       {version.Build(15, 0, 0), func(uprobe *UProbeExeFile, bpf *bpfObjects, 
nodeModule *profiling.Module) {
+       {version.Build(15, 0, 0), func(uprobe *bpf.UProbeExeFile, bpf 
*bpf.Loader, nodeModule *profiling.Module) {
                
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, 
strings.HasPrefix, "_ZN4node6crypto7TLSWrapC2E"),
                        bpf.NodeTlsWrap, bpf.NodeTlsWrapRet)
                
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, 
strings.HasPrefix, "_ZN4node6crypto7TLSWrap7ClearInE"),
@@ -344,8 +346,8 @@ func findNodeTLSAddrConfig(v *version.Version) 
(*NodeTLSAddrInBPF, error) {
        return nil, fmt.Errorf("could not support version: %s", v)
 }
 
-func registerNodeTLSProbes(v *version.Version, bpf *bpfObjects, linker 
*Linker, nodeModule, libSSLModule *profiling.Module) error {
-       var probeFunc func(uprobe *UProbeExeFile, bpf *bpfObjects, nodeModule 
*profiling.Module)
+func registerNodeTLSProbes(v *version.Version, loader *bpf.Loader, nodeModule, 
libSSLModule *profiling.Module) error {
+       var probeFunc func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, 
nodeModule *profiling.Module)
        for _, c := range nodeTLSProbeWithVersions {
                if v.GreaterOrEquals(c.v) {
                        probeFunc = c.f
@@ -354,13 +356,13 @@ func registerNodeTLSProbes(v *version.Version, bpf 
*bpfObjects, linker *Linker,
        if probeFunc == nil {
                return fmt.Errorf("the version is not support: %v", v)
        }
-       file := linker.OpenUProbeExeFile(nodeModule.Path)
-       probeFunc(file, bpf, nodeModule)
+       file := loader.OpenUProbeExeFile(nodeModule.Path)
+       probeFunc(file, loader, nodeModule)
 
        // find the SSL_new, and register
-       file = linker.OpenUProbeExeFile(libSSLModule.Path)
-       file.AddLinkWithType("SSL_new", false, bpf.NodeTlsRetSsl)
-       return linker.HasError()
+       file = loader.OpenUProbeExeFile(libSSLModule.Path)
+       file.AddLinkWithType("SSL_new", false, loader.NodeTlsRetSsl)
+       return loader.HasError()
 }
 
 func getNodeVersion(p string) (*version.Version, error) {
@@ -493,10 +495,10 @@ func assignGoTLSArgsLocation(err error, function 
*elf.FunctionInfo, argName stri
                return fmt.Errorf("the args is not found, function: %s, args 
name: %s", function.Name(), argName)
        }
        if args.Location.Type == elf.ArgLocationTypeStack {
-               dest.Type = GoTLSArgsLocationTypeStack
+               dest.Type = base.GoTLSArgsLocationTypeStack
                dest.Offset = uint32(args.Location.Offset) + kSPOffset
        } else if args.Location.Type == elf.ArgLocationTypeRegister {
-               dest.Type = GoTLSArgsLocationTypeRegister
+               dest.Type = base.GoTLSArgsLocationTypeRegister
                dest.Offset = uint32(args.Location.Offset)
        } else {
                return fmt.Errorf("the location type is not support, function: 
%s, args name: %s, type: %d",

Reply via email to