This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/reduce-handle-connect-time by 
this push:
     new 7b9d725  combine data and detail
7b9d725 is described below

commit 7b9d7259703c0b639bace1f50d2783a128dfb732
Author: mrproliu <[email protected]>
AuthorDate: Sun Dec 29 20:15:44 2024 +0800

    combine data and detail
---
 pkg/accesslog/collector/protocols/queue.go | 14 +++++++-------
 pkg/accesslog/common/connection.go         | 16 ----------------
 pkg/tools/buffer/buffer.go                 | 14 ++++++++++++++
 3 files changed, 21 insertions(+), 23 deletions(-)

diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index c3ccd4c..dfd053b 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -175,7 +175,7 @@ func (p *PartitionContext) Start(ctx context.Context) {
                        select {
                        case <-timeTicker.C:
                                // process event with interval
-                               p.processEvents()
+                               p.ProcessEvents()
                        case <-ctx.Done():
                                timeTicker.Stop()
                                return
@@ -189,7 +189,7 @@ func (p *PartitionContext) Start(ctx context.Context) {
                for {
                        select {
                        case <-expireTicker.C:
-                               p.processExpireEvents()
+                               p.ProcessExpireEvents()
                        case <-ctx.Done():
                                expireTicker.Stop()
                                return
@@ -211,19 +211,19 @@ func (p *PartitionContext) Consume(data interface{}) {
                        forwarder.SendTransferNoProtocolEvent(p.context, event)
                        return
                }
-               connection := p.getConnectionContext(event.GetConnectionID(), 
event.GetRandomID(), event.GetProtocol(), event.DataID())
+               connection := p.GetConnectionContext(event.GetConnectionID(), 
event.GetRandomID(), event.GetProtocol(), event.DataID())
                connection.AppendDetail(p.context, event)
        case *events.SocketDataUploadEvent:
                pid, _ := events.ParseConnectionID(event.ConnectionID)
                log.Debugf("receive the socket data event, connection ID: %d, 
random ID: %d, pid: %d, prev data id: %d, "+
                        "data id: %d, sequence: %d, protocol: %d",
                        event.ConnectionID, event.RandomID, pid, 
event.PrevDataID0, event.DataID0, event.Sequence0, event.Protocol0)
-               connection := p.getConnectionContext(event.ConnectionID, 
event.RandomID, event.Protocol0, event.DataID0)
+               connection := p.GetConnectionContext(event.ConnectionID, 
event.RandomID, event.Protocol0, event.DataID0)
                connection.AppendData(event)
        }
 }
 
-func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64,
+func (p *PartitionContext) GetConnectionContext(connectionID, randomID uint64,
        protocol enums.ConnectionProtocol, currentDataID uint64) 
*PartitionConnection {
        conKey := p.buildConnectionKey(connectionID, randomID)
        conn, exist := p.connections.Get(conKey)
@@ -245,7 +245,7 @@ func (p *PartitionContext) buildConnectionKey(conID, ranID 
uint64) string {
        return string(buf)
 }
 
-func (p *PartitionContext) processEvents() {
+func (p *PartitionContext) ProcessEvents() {
        // it could be triggered by interval or reach counter
        // if any trigger bean locked, the other one just ignore process
        if !p.analyzeLocker.TryLock() {
@@ -303,7 +303,7 @@ func (p *PartitionContext) 
checkTheConnectionIsAlreadyClose(con *PartitionConnec
        }
 }
 
-func (p *PartitionContext) processExpireEvents() {
+func (p *PartitionContext) ProcessExpireEvents() {
        // the expiry must be mutual exclusion with events processor
        p.analyzeLocker.Lock()
        defer p.analyzeLocker.Unlock()
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index 7fc2550..12e5a5c 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -47,9 +47,6 @@ import (
 )
 
 const (
-       // only using to match the remote IP address
-       localAddressPairCacheTime = time.Second * 15
-
        // clean the active connection in BPF interval
        cleanActiveConnectionInterval = time.Second * 20
 
@@ -60,19 +57,6 @@ const (
        connectionCheckExistTime = time.Second * 30
 )
 
-type addressProcessType int
-
-const (
-       addressProcessTypeUnknown addressProcessType = iota
-       addressProcessTypeLocal
-       addressProcessTypeKubernetes
-)
-
-const (
-       strLocal  = "local"
-       strRemote = "remote"
-)
-
 type ConnectEventWithSocket struct {
        *events.SocketConnectEvent
        SocketPair *ip.SocketPair
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index fe2b77a..4c18bff 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -371,6 +371,20 @@ func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
        return r.dataEvents.Back().Value.(SocketDataBuffer)
 }
 
+func (r *Buffer) TotalBuffer() []SocketDataBuffer {
+       if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 {
+               return nil
+       }
+       result := make([]SocketDataBuffer, 0, r.dataEvents.Len())
+       for e := r.dataEvents.Front(); e != nil; e = e.Next() {
+               if e.Value == nil {
+                       continue
+               }
+               result = append(result, e.Value.(SocketDataBuffer))
+       }
+       return result
+}
+
 // DetectNotSendingLastPosition detect the buffer contains not sending data: 
the BPF limited socket data count
 func (r *Buffer) DetectNotSendingLastPosition() *Position {
        if r == nil || r.dataEvents.Len() == 0 {

Reply via email to