This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/reduce-handle-connect-time by
this push:
new 7b9d725 combine data and detail
7b9d725 is described below
commit 7b9d7259703c0b639bace1f50d2783a128dfb732
Author: mrproliu <[email protected]>
AuthorDate: Sun Dec 29 20:15:44 2024 +0800
combine data and detail
---
pkg/accesslog/collector/protocols/queue.go | 14 +++++++-------
pkg/accesslog/common/connection.go | 16 ----------------
pkg/tools/buffer/buffer.go | 14 ++++++++++++++
3 files changed, 21 insertions(+), 23 deletions(-)
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index c3ccd4c..dfd053b 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -175,7 +175,7 @@ func (p *PartitionContext) Start(ctx context.Context) {
select {
case <-timeTicker.C:
// process event with interval
- p.processEvents()
+ p.ProcessEvents()
case <-ctx.Done():
timeTicker.Stop()
return
@@ -189,7 +189,7 @@ func (p *PartitionContext) Start(ctx context.Context) {
for {
select {
case <-expireTicker.C:
- p.processExpireEvents()
+ p.ProcessExpireEvents()
case <-ctx.Done():
expireTicker.Stop()
return
@@ -211,19 +211,19 @@ func (p *PartitionContext) Consume(data interface{}) {
forwarder.SendTransferNoProtocolEvent(p.context, event)
return
}
- connection := p.getConnectionContext(event.GetConnectionID(),
event.GetRandomID(), event.GetProtocol(), event.DataID())
+ connection := p.GetConnectionContext(event.GetConnectionID(),
event.GetRandomID(), event.GetProtocol(), event.DataID())
connection.AppendDetail(p.context, event)
case *events.SocketDataUploadEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
log.Debugf("receive the socket data event, connection ID: %d,
random ID: %d, pid: %d, prev data id: %d, "+
"data id: %d, sequence: %d, protocol: %d",
event.ConnectionID, event.RandomID, pid,
event.PrevDataID0, event.DataID0, event.Sequence0, event.Protocol0)
- connection := p.getConnectionContext(event.ConnectionID,
event.RandomID, event.Protocol0, event.DataID0)
+ connection := p.GetConnectionContext(event.ConnectionID,
event.RandomID, event.Protocol0, event.DataID0)
connection.AppendData(event)
}
}
-func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64,
+func (p *PartitionContext) GetConnectionContext(connectionID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64)
*PartitionConnection {
conKey := p.buildConnectionKey(connectionID, randomID)
conn, exist := p.connections.Get(conKey)
@@ -245,7 +245,7 @@ func (p *PartitionContext) buildConnectionKey(conID, ranID
uint64) string {
return string(buf)
}
-func (p *PartitionContext) processEvents() {
+func (p *PartitionContext) ProcessEvents() {
// it could be triggered by interval or reach counter
// if any trigger bean locked, the other one just ignore process
if !p.analyzeLocker.TryLock() {
@@ -303,7 +303,7 @@ func (p *PartitionContext)
checkTheConnectionIsAlreadyClose(con *PartitionConnec
}
}
-func (p *PartitionContext) processExpireEvents() {
+func (p *PartitionContext) ProcessExpireEvents() {
// the expiry must be mutual exclusion with events processor
p.analyzeLocker.Lock()
defer p.analyzeLocker.Unlock()
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index 7fc2550..12e5a5c 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -47,9 +47,6 @@ import (
)
const (
- // only using to match the remote IP address
- localAddressPairCacheTime = time.Second * 15
-
// clean the active connection in BPF interval
cleanActiveConnectionInterval = time.Second * 20
@@ -60,19 +57,6 @@ const (
connectionCheckExistTime = time.Second * 30
)
-type addressProcessType int
-
-const (
- addressProcessTypeUnknown addressProcessType = iota
- addressProcessTypeLocal
- addressProcessTypeKubernetes
-)
-
-const (
- strLocal = "local"
- strRemote = "remote"
-)
-
type ConnectEventWithSocket struct {
*events.SocketConnectEvent
SocketPair *ip.SocketPair
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index fe2b77a..4c18bff 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -371,6 +371,20 @@ func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
return r.dataEvents.Back().Value.(SocketDataBuffer)
}
+func (r *Buffer) TotalBuffer() []SocketDataBuffer {
+ if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 {
+ return nil
+ }
+ result := make([]SocketDataBuffer, 0, r.dataEvents.Len())
+ for e := r.dataEvents.Front(); e != nil; e = e.Next() {
+ if e.Value == nil {
+ continue
+ }
+ result = append(result, e.Value.(SocketDataBuffer))
+ }
+ return result
+}
+
// DetectNotSendingLastPosition detect the buffer contains not sending data:
the BPF limited socket data count
func (r *Buffer) DetectNotSendingLastPosition() *Position {
if r == nil || r.dataEvents.Len() == 0 {