This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch tmp_disable_reading
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/tmp_disable_reading by this
push:
new 933ce0e Add max data id support
933ce0e is described below
commit 933ce0e18cdedd1e0a9afe2e1ac51529ccbd70d1
Author: mrproliu <[email protected]>
AuthorDate: Thu Dec 19 16:02:59 2024 +0800
Add max data id support
---
pkg/accesslog/collector/protocols/queue.go | 9 +++++----
pkg/accesslog/events/data.go | 6 ++++++
pkg/profiling/task/network/analyze/events/data.go | 5 +++++
.../task/network/analyze/layer7/protocols/base/analyzer.go | 1 +
.../network/analyze/layer7/protocols/http1/reader/request.go | 7 +++++++
pkg/tools/buffer/buffer.go | 8 ++++++++
6 files changed, 32 insertions(+), 4 deletions(-)
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index d071fe7..37af48d 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -241,8 +241,9 @@ func (p *PartitionContext) Consume(data interface{}) {
status = 2
}
}
- log.Infof("receive the socket data event, connection ID: %d,
random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d, http1 type:
%d",
- event.ConnectionID, event.RandomID, pid, event.DataID0,
event.Sequence0, event.Protocol0, status)
+ log.Infof("receive the socket data event, connection ID: %d,
random ID: %d, pid: %d, prev data id: %d, "+
+ "data id: %d, sequence: %d, protocol: %d, http1 type:
%d",
+ event.ConnectionID, event.RandomID, pid,
event.PrevDataID0, event.DataID0, event.Sequence0, event.Protocol0, status)
connection := p.getConnectionContext(event.ConnectionID,
event.RandomID, event.Protocol0, event.DataID0)
connection.AppendData(event)
}
@@ -252,7 +253,7 @@ func (p *PartitionContext)
getConnectionContext(connectionID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64)
*PartitionConnection {
conKey := p.buildConnectionKey(connectionID, randomID)
conn, exist := p.connections.Get(conKey)
- log.Infof("get the connection context, connection ID: %d, random ID:
%d, partition number: %d, connection exist: %t"+
+ log.Infof("get the connection context, connection ID: %d, random ID:
%d, partition number: %d, connection exist: %t, "+
"protoocl: %d, current data ID: %d, partition context: %p",
connectionID, randomID, p.partitionNum, exist, protocol,
currentDataID, p)
if exist {
@@ -301,7 +302,7 @@ func (p *PartitionContext) processEvents() {
info.closeCallback()
}
closedConnections = append(closedConnections, conKey)
- log.Debugf("detect the connection is already closed,
then notify to the callback, connection ID: %d, random ID: %d, partition
number: %d",
+ log.Infof("detect the connection is already closed,
then notify to the callback, connection ID: %d, random ID: %d, partition
number: %d",
info.connectionID, info.randomID,
p.partitionNum)
}
})
diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go
index 83e4ba4..6ecfd2f 100644
--- a/pkg/accesslog/events/data.go
+++ b/pkg/accesslog/events/data.go
@@ -36,6 +36,7 @@ type SocketDataUploadEvent struct {
ConnectionID uint64
RandomID uint64
DataID0 uint64
+ PrevDataID0 uint64
TotalSize0 uint64
Buffer [2048]byte
}
@@ -52,6 +53,7 @@ func (s *SocketDataUploadEvent) ReadFrom(r *reader.Reader) {
s.ConnectionID = r.ReadUint64()
s.RandomID = r.ReadUint64()
s.DataID0 = r.ReadUint64()
+ s.PrevDataID0 = r.ReadUint64()
s.TotalSize0 = r.ReadUint64()
r.ReadUint8Array(s.Buffer[:], 2048)
}
@@ -96,6 +98,10 @@ func (s *SocketDataUploadEvent) DataID() uint64 {
return s.DataID0
}
+func (s *SocketDataUploadEvent) PrevDataID() uint64 {
+ return s.PrevDataID0
+}
+
func (s *SocketDataUploadEvent) DataSequence() int {
return int(s.Sequence0)
}
diff --git a/pkg/profiling/task/network/analyze/events/data.go
b/pkg/profiling/task/network/analyze/events/data.go
index 1f71fa8..bc451fc 100644
--- a/pkg/profiling/task/network/analyze/events/data.go
+++ b/pkg/profiling/task/network/analyze/events/data.go
@@ -35,6 +35,7 @@ type SocketDataUploadEvent struct {
ConnectionID uint64
RandomID uint64
DataID0 uint64
+ PrevDataID0 uint64
TotalSize0 uint64
Buffer [2048]byte
}
@@ -79,6 +80,10 @@ func (s *SocketDataUploadEvent) DataID() uint64 {
return s.DataID0
}
+func (s *SocketDataUploadEvent) PrevDataID() uint64 {
+ return s.PrevDataID0
+}
+
func (s *SocketDataUploadEvent) DataSequence() int {
return int(s.Sequence0)
}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
index 1acc342..357c2bb 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -154,6 +154,7 @@ func (a *ProtocolAnalyzer) processEvents() {
for _, conKey := range closedConnections {
a.connections.Remove(conKey)
+ log.Infof("remove the closed connection in analyzer: %s",
conKey)
}
}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
index 7224727..f966743 100644
---
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
+++
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -57,6 +57,13 @@ func (r *Request) MinDataID() int {
return int(r.headerBuffer.FirstSocketBuffer().DataID())
}
+func (r *Request) MaxDataID() int {
+ if r.bodyBuffer != nil {
+ return int(r.bodyBuffer.LastSocketBuffer().DataID())
+ }
+ return int(r.headerBuffer.LastSocketBuffer().DataID())
+}
+
func (r *Request) Original() *http.Request {
return r.original
}
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 11ed529..b2c19e9 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -55,6 +55,8 @@ type SocketDataBuffer interface {
BufferLen() int
// DataID data id of the buffer
DataID() uint64
+ // PrevDataID the previous data id of the buffer
+ PrevDataID() uint64
// DataSequence the data sequence under same data id
DataSequence() int
// IsStart this buffer is start of the same data id
@@ -188,6 +190,10 @@ func (p *Position) DataID() uint64 {
return p.element.Value.(SocketDataBuffer).DataID()
}
+func (p *Position) PrevDataID() uint64 {
+ return p.element.Value.(SocketDataBuffer).PrevDataID()
+}
+
func (p *Position) Seq() int {
return p.element.Value.(SocketDataBuffer).DataSequence()
}
@@ -821,6 +827,8 @@ func (r *Buffer) DeleteExpireEvents(expireDuration
time.Duration) int {
}
func (r *Buffer) DataLength() int {
+ r.eventLocker.RLock()
+ defer r.eventLocker.RUnlock()
if r.dataEvents == nil {
return 0
}