This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch tmp_disable_reading
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/tmp_disable_reading by this 
push:
     new 933ce0e  Add max data id support
933ce0e is described below

commit 933ce0e18cdedd1e0a9afe2e1ac51529ccbd70d1
Author: mrproliu <[email protected]>
AuthorDate: Thu Dec 19 16:02:59 2024 +0800

    Add max data id support
---
 pkg/accesslog/collector/protocols/queue.go                       | 9 +++++----
 pkg/accesslog/events/data.go                                     | 6 ++++++
 pkg/profiling/task/network/analyze/events/data.go                | 5 +++++
 .../task/network/analyze/layer7/protocols/base/analyzer.go       | 1 +
 .../network/analyze/layer7/protocols/http1/reader/request.go     | 7 +++++++
 pkg/tools/buffer/buffer.go                                       | 8 ++++++++
 6 files changed, 32 insertions(+), 4 deletions(-)

diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index d071fe7..37af48d 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -241,8 +241,9 @@ func (p *PartitionContext) Consume(data interface{}) {
                                status = 2
                        }
                }
-               log.Infof("receive the socket data event, connection ID: %d, 
random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d, http1 type: 
%d",
-                       event.ConnectionID, event.RandomID, pid, event.DataID0, 
event.Sequence0, event.Protocol0, status)
+               log.Infof("receive the socket data event, connection ID: %d, 
random ID: %d, pid: %d, prev data id: %d, "+
+                       "data id: %d, sequence: %d, protocol: %d, http1 type: 
%d",
+                       event.ConnectionID, event.RandomID, pid, 
event.PrevDataID0, event.DataID0, event.Sequence0, event.Protocol0, status)
                connection := p.getConnectionContext(event.ConnectionID, 
event.RandomID, event.Protocol0, event.DataID0)
                connection.AppendData(event)
        }
@@ -252,7 +253,7 @@ func (p *PartitionContext) 
getConnectionContext(connectionID, randomID uint64,
        protocol enums.ConnectionProtocol, currentDataID uint64) 
*PartitionConnection {
        conKey := p.buildConnectionKey(connectionID, randomID)
        conn, exist := p.connections.Get(conKey)
-       log.Infof("get the connection context, connection ID: %d, random ID: 
%d, partition number: %d, connection exist: %t"+
+       log.Infof("get the connection context, connection ID: %d, random ID: 
%d, partition number: %d, connection exist: %t, "+
                "protoocl: %d, current data ID: %d, partition context: %p",
                connectionID, randomID, p.partitionNum, exist, protocol, 
currentDataID, p)
        if exist {
@@ -301,7 +302,7 @@ func (p *PartitionContext) processEvents() {
                                info.closeCallback()
                        }
                        closedConnections = append(closedConnections, conKey)
-                       log.Debugf("detect the connection is already closed, 
then notify to the callback, connection ID: %d, random ID: %d, partition 
number: %d",
+                       log.Infof("detect the connection is already closed, 
then notify to the callback, connection ID: %d, random ID: %d, partition 
number: %d",
                                info.connectionID, info.randomID, 
p.partitionNum)
                }
        })
diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go
index 83e4ba4..6ecfd2f 100644
--- a/pkg/accesslog/events/data.go
+++ b/pkg/accesslog/events/data.go
@@ -36,6 +36,7 @@ type SocketDataUploadEvent struct {
        ConnectionID uint64
        RandomID     uint64
        DataID0      uint64
+       PrevDataID0  uint64
        TotalSize0   uint64
        Buffer       [2048]byte
 }
@@ -52,6 +53,7 @@ func (s *SocketDataUploadEvent) ReadFrom(r *reader.Reader) {
        s.ConnectionID = r.ReadUint64()
        s.RandomID = r.ReadUint64()
        s.DataID0 = r.ReadUint64()
+       s.PrevDataID0 = r.ReadUint64()
        s.TotalSize0 = r.ReadUint64()
        r.ReadUint8Array(s.Buffer[:], 2048)
 }
@@ -96,6 +98,10 @@ func (s *SocketDataUploadEvent) DataID() uint64 {
        return s.DataID0
 }
 
+func (s *SocketDataUploadEvent) PrevDataID() uint64 {
+       return s.PrevDataID0
+}
+
 func (s *SocketDataUploadEvent) DataSequence() int {
        return int(s.Sequence0)
 }
diff --git a/pkg/profiling/task/network/analyze/events/data.go 
b/pkg/profiling/task/network/analyze/events/data.go
index 1f71fa8..bc451fc 100644
--- a/pkg/profiling/task/network/analyze/events/data.go
+++ b/pkg/profiling/task/network/analyze/events/data.go
@@ -35,6 +35,7 @@ type SocketDataUploadEvent struct {
        ConnectionID uint64
        RandomID     uint64
        DataID0      uint64
+       PrevDataID0  uint64
        TotalSize0   uint64
        Buffer       [2048]byte
 }
@@ -79,6 +80,10 @@ func (s *SocketDataUploadEvent) DataID() uint64 {
        return s.DataID0
 }
 
+func (s *SocketDataUploadEvent) PrevDataID() uint64 {
+       return s.PrevDataID0
+}
+
 func (s *SocketDataUploadEvent) DataSequence() int {
        return int(s.Sequence0)
 }
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
index 1acc342..357c2bb 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -154,6 +154,7 @@ func (a *ProtocolAnalyzer) processEvents() {
 
        for _, conKey := range closedConnections {
                a.connections.Remove(conKey)
+               log.Infof("remove the closed connection in analyzer: %s", 
conKey)
        }
 }
 
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
index 7224727..f966743 100644
--- 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
+++ 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -57,6 +57,13 @@ func (r *Request) MinDataID() int {
        return int(r.headerBuffer.FirstSocketBuffer().DataID())
 }
 
+func (r *Request) MaxDataID() int {
+       if r.bodyBuffer != nil {
+               return int(r.bodyBuffer.LastSocketBuffer().DataID())
+       }
+       return int(r.headerBuffer.LastSocketBuffer().DataID())
+}
+
 func (r *Request) Original() *http.Request {
        return r.original
 }
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 11ed529..b2c19e9 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -55,6 +55,8 @@ type SocketDataBuffer interface {
        BufferLen() int
        // DataID data id of the buffer
        DataID() uint64
+       // PrevDataID the previous data id of the buffer
+       PrevDataID() uint64
        // DataSequence the data sequence under same data id
        DataSequence() int
        // IsStart this buffer is start of the same data id
@@ -188,6 +190,10 @@ func (p *Position) DataID() uint64 {
        return p.element.Value.(SocketDataBuffer).DataID()
 }
 
+func (p *Position) PrevDataID() uint64 {
+       return p.element.Value.(SocketDataBuffer).PrevDataID()
+}
+
 func (p *Position) Seq() int {
        return p.element.Value.(SocketDataBuffer).DataSequence()
 }
@@ -821,6 +827,8 @@ func (r *Buffer) DeleteExpireEvents(expireDuration 
time.Duration) int {
 }
 
 func (r *Buffer) DataLength() int {
+       r.eventLocker.RLock()
+       defer r.eventLocker.RUnlock()
        if r.dataEvents == nil {
                return 0
        }

Reply via email to