This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/reduce-handle-connect-time by
this push:
new e2a4d32 update conntrack
e2a4d32 is described below
commit e2a4d32331915277288737209a2601b3204e955a
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 15:41:18 2024 +0800
update conntrack
---
pkg/tools/ip/conntrack.go | 80 +++++++++++++++++++++++------------------------
1 file changed, 40 insertions(+), 40 deletions(-)
diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go
index 4ccc365..124476e 100644
--- a/pkg/tools/ip/conntrack.go
+++ b/pkg/tools/ip/conntrack.go
@@ -58,45 +58,6 @@ func NewConnTrack() (*ConnTrack, error) {
return nil, err
}
- go func() {
- client, _ := conntrack.Dial(nil)
- evCh := make(chan conntrack.Event, 2048)
-
- errCh, err := client.Listen(evCh, 4, []netfilter.NetlinkGroup{
- netfilter.GroupCTNew, // watching for new conntrack
events
- })
- if err != nil {
- log.Error(err)
- }
-
- client.SetReadBuffer(26214400) // 25MB
- // Listen to Conntrack events from all network namespaces on
the system.
- err = client.SetOption(netlink.ListenAllNSID, true)
- if err != nil {
- log.Error(err)
- }
-
- // Start a goroutine to print all incoming messages on the
event channel.
- go func() {
- for {
- e := <-evCh
- if e.Flow.TupleOrig.Proto.DestinationPort == 53
{
- continue
- }
- log.Infof("conntrack: type: %s, origin:
%s:%d->%s:%d, reply: %s:%d->%s:%d", e.Type,
- e.Flow.TupleOrig.IP.SourceAddress,
e.Flow.TupleOrig.Proto.SourcePort,
- e.Flow.TupleOrig.IP.DestinationAddress,
e.Flow.TupleOrig.Proto.DestinationPort,
- e.Flow.TupleReply.IP.SourceAddress,
e.Flow.TupleReply.Proto.SourcePort,
-
e.Flow.TupleReply.IP.DestinationAddress,
e.Flow.TupleReply.Proto.DestinationPort)
- }
- }()
-
- // Stop the program as soon as an error is caught in a decoder
goroutine.
- if err := <-errCh; err != nil {
- log.Errorf("conntrack error: %v", err)
- }
- }()
-
return &ConnTrack{
queryClient: queryClient,
eventChain: make(chan conntrack.Event, 2048),
@@ -107,7 +68,7 @@ func NewConnTrack() (*ConnTrack, error) {
func (c *ConnTrack) StartMonitoring(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
errors := make(chan error, 2)
- errChain, err := c.monitor0(ctx)
+ errChain, err := c.monitor0(c.ctx)
if err != nil {
return err
}
@@ -146,6 +107,45 @@ func (c *ConnTrack) monitor0(ctx context.Context) (chan
error, error) {
}
}
+ go func() {
+ client, _ := conntrack.Dial(nil)
+ evCh := make(chan conntrack.Event, 2048)
+
+ errCh, err := client.Listen(evCh, 4, []netfilter.NetlinkGroup{
+ netfilter.GroupCTNew, // watching for new conntrack
events
+ })
+ if err != nil {
+ log.Errorf("++++++: %v", err)
+ }
+
+ client.SetReadBuffer(26214400) // 25MB
+ // Listen to Conntrack events from all network namespaces on
the system.
+ err = client.SetOption(netlink.ListenAllNSID, true)
+ if err != nil {
+ log.Errorf("0-------: %v", err)
+ }
+
+ // Start a goroutine to print all incoming messages on the
event channel.
+ go func() {
+ for {
+ e := <-evCh
+ if e.Flow.TupleOrig.Proto.DestinationPort == 53
{
+ continue
+ }
+ log.Infof("conntrack: type: %s, origin:
%s:%d->%s:%d, reply: %s:%d->%s:%d", e.Type,
+ e.Flow.TupleOrig.IP.SourceAddress,
e.Flow.TupleOrig.Proto.SourcePort,
+ e.Flow.TupleOrig.IP.DestinationAddress,
e.Flow.TupleOrig.Proto.DestinationPort,
+ e.Flow.TupleReply.IP.SourceAddress,
e.Flow.TupleReply.Proto.SourcePort,
+
e.Flow.TupleReply.IP.DestinationAddress,
e.Flow.TupleReply.Proto.DestinationPort)
+ }
+ }()
+
+ // Stop the program as soon as an error is caught in a decoder
goroutine.
+ if err := <-errCh; err != nil {
+ log.Errorf("conntrack error: %v", err)
+ }
+ }()
+
cl, err := conntrack.Dial(nil)
if err != nil {
return nil, err