This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 8550199  Enhance the protocol reader for support long socket data (#69)
8550199 is described below

commit 8550199e98c9f5a4b2058878a0a899ffb73fe461
Author: mrproliu <[email protected]>
AuthorDate: Thu Jan 5 16:50:47 2023 +0800

    Enhance the protocol reader for support long socket data (#69)
---
 CHANGES.md                                         |  13 +
 bpf/profiling/network/netmonitor.c                 |  28 +-
 bpf/profiling/network/sock_stats.h                 |   2 +-
 .../task/network/analyze/layer7/events.go          |   4 +
 .../task/network/analyze/layer7/listener.go        |   4 +-
 .../analyze/layer7/protocols/base/analyzer.go      | 229 ++++++++++++
 .../analyze/layer7/protocols/base/buffer.go        | 414 +++++++++++++++++++++
 .../analyze/layer7/protocols/base/buffer_test.go   |  94 +++++
 .../analyze/layer7/protocols/base/events.go        | 118 +++---
 .../analyze/layer7/protocols/base/protocol.go      |  16 +-
 .../analyze/layer7/protocols/http1/analyzer.go     | 318 +++++-----------
 .../layer7/protocols/http1/analyzer_test.go        | 310 ---------------
 .../analyze/layer7/protocols/http1/builder.go      | 290 ---------------
 .../analyze/layer7/protocols/http1/metrics.go      | 174 ++-------
 .../layer7/protocols/http1/reader/reader.go        | 298 +++++++++++++++
 .../layer7/protocols/http1/reader/request.go       | 143 +++++++
 .../layer7/protocols/http1/reader/response.go      | 125 +++++++
 .../analyze/layer7/protocols/http1/sampling.go     |  29 +-
 .../network/analyze/layer7/protocols/protocols.go  |  37 +-
 pkg/profiling/task/network/analyze/layer7/queue.go |   6 +-
 pkg/tools/host/time.go                             |   8 +-
 21 files changed, 1562 insertions(+), 1098 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index b80c9f5..b6155ec 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,6 +2,19 @@ Changes by Version
 ==================
 Release Notes.
 
+0.5.0
+------------------
+#### Features
+* Enhance the protocol reader for support long socket data.
+
+#### Bug Fixes
+
+#### Documentation
+
+#### Issues and PR
+- All issues are 
[here](https://github.com/apache/skywalking/milestone/167?closed=1)
+- All and pull requests are 
[here](https://github.com/apache/skywalking-rover/milestone/5?closed=1)
+
 0.4.0
 ------------------
 #### Features
diff --git a/bpf/profiling/network/netmonitor.c 
b/bpf/profiling/network/netmonitor.c
index 05e8803..43577ba 100644
--- a/bpf/profiling/network/netmonitor.c
+++ b/bpf/profiling/network/netmonitor.c
@@ -46,7 +46,7 @@ char __license[] SEC("license") = "Dual MIT/GPL";
                val;                                                           \
        })
 
-#define SOCKET_UPLOAD_CHUNK_LIMIT 8
+#define SOCKET_UPLOAD_CHUNK_LIMIT 12
 
 static __inline bool family_should_trace(const __u32 family) {
     return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ? 
false : true;
@@ -272,10 +272,11 @@ static __always_inline void resent_connect_event(struct 
pt_regs *ctx, __u32 tgid
     }
 }
 
-static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 
index, char* buf, size_t size, __u32 is_finished, struct 
socket_data_upload_event *event) {
+static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 
index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk, 
struct socket_data_upload_event *event) {
     event->sequence = index;
     event->data_len = size;
     event->finished = is_finished;
+    event->have_reduce_after_chunk = have_reduce_after_chunk;
     if (size <= 0) {
         return;
     }
@@ -292,8 +293,10 @@ static __always_inline void upload_socket_data_buf(void 
*ctx, char* buf, ssize_t
         // calculate bytes need to send
         ssize_t remaining = size - already_send;
         size_t need_send_in_chunk = 0;
+        __u8 have_reduce_after_chunk = 0;
         if (remaining > MAX_TRANSMIT_SOCKET_READ_LENGTH) {
             need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH;
+            have_reduce_after_chunk = 1;
         } else {
             need_send_in_chunk = remaining;
         }
@@ -304,7 +307,7 @@ static __always_inline void upload_socket_data_buf(void 
*ctx, char* buf, ssize_t
             is_finished = 0;
             sequence = generate_socket_sequence(event->conid, event->data_id);
         }
-        __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, 
need_send_in_chunk, is_finished, event);
+        __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, 
need_send_in_chunk, is_finished, have_reduce_after_chunk, event);
         already_send += need_send_in_chunk;
 
     }
@@ -316,19 +319,22 @@ if (iov_index < iovlen) {                                 
  \
     BPF_PROBE_READ_VAR(cur_iov, &iov[iov_index]);           \
     ssize_t remaining = size - already_send;                \
     size_t need_send_in_chunk = remaining - cur_iov_sended; \
+    __u8 have_reduce_after_chunk = 0;                       \
     if (cur_iov_sended + need_send_in_chunk > cur_iov.iov_len) {            \
         need_send_in_chunk = cur_iov.iov_len - cur_iov_sended;              \
         if (need_send_in_chunk > MAX_TRANSMIT_SOCKET_READ_LENGTH) {         \
             need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH;           \
+            have_reduce_after_chunk = 1;                                    \
         } else {                                                            \
             iov_index++;                                                    \
             cur_iov_sended = 0;                                             \
         }                                                                   \
     } else if (need_send_in_chunk > MAX_TRANSMIT_SOCKET_READ_LENGTH) {      \
         need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH;               \
+        have_reduce_after_chunk = 1;                                        \
     }                                                                       \
-    __u32 is_finished = (need_send_in_chunk + already_send) >= size || 
loop_count == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false; \
-    __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + 
cur_iov_sended, need_send_in_chunk, is_finished, event);    \
+    __u32 is_finished = (need_send_in_chunk + already_send) >= size || 
loop_count == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false;                   
         \
+    __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + 
cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk, 
event);      \
     already_send += need_send_in_chunk;                                        
                                                      \
     loop_count++;                                                              
                                                      \
 }
@@ -365,23 +371,11 @@ static __inline void upload_socket_data(void *ctx, __u64 
start_time, __u64 end_t
     if (connection->ssl != ssl) {
         return;
     }
-    // if the msg type is unknown, then try to re-analysis
-    if (existing_msg_type == CONNECTION_MESSAGE_TYPE_UNKNOWN) {
-        struct socket_buffer_reader_t *buf_reader = read_socket_data(args, 
bytes_count);
-        if (buf_reader == NULL) {
-            return;
-        }
-        existing_msg_type = analyze_protocol(buf_reader->buffer, 
buf_reader->data_len, connection);
-        if (existing_msg_type == CONNECTION_MESSAGE_TYPE_UNKNOWN && 
args->ssl_buffer_force_unfinished == 0) {
-            return;
-        }
-    }
 
     // basic data
     event->start_time = start_time;
     event->end_time = end_time;
     event->protocol = connection->protocol;
-    event->msg_type = existing_msg_type;
     event->direction = data_direction;
     event->conid = conid;
     event->randomid = connection->random_id;
diff --git a/bpf/profiling/network/sock_stats.h 
b/bpf/profiling/network/sock_stats.h
index 84a46eb..8312af3 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -258,7 +258,7 @@ struct {
 
 struct socket_data_upload_event {
     __u8 protocol;
-    __u8 msg_type;
+    __u8 have_reduce_after_chunk;
     __u8 direction;
     __u8 finished;
     __u16 sequence;
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go 
b/pkg/profiling/task/network/analyze/layer7/events.go
index 9096913..5590dce 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -61,6 +61,10 @@ func NewSocketDataPartitionContext(l base.Context, config 
*profiling.TaskConfig)
        }
 }
 
+func (p *SocketDataPartitionContext) Start(ctx context.Context) {
+       p.analyzer.Start(ctx)
+}
+
 func (p *SocketDataPartitionContext) Consume(data interface{}) {
        event := data.(*base.SocketDataUploadEvent)
        p.analyzer.ReceiveSocketDataEvent(event)
diff --git a/pkg/profiling/task/network/analyze/layer7/listener.go 
b/pkg/profiling/task/network/analyze/layer7/listener.go
index 8edb532..61f32bd 100644
--- a/pkg/profiling/task/network/analyze/layer7/listener.go
+++ b/pkg/profiling/task/network/analyze/layer7/listener.go
@@ -132,9 +132,9 @@ func (l *Listener) QueryConnection(conID, randomID uint64) 
*base.ConnectionConte
        return nil
 }
 
-func (l *Listener) QueryProtocolMetrics(conMetrics 
*base.ConnectionMetricsContext, protocolName string) protocol.Metrics {
+func (l *Listener) QueryProtocolMetrics(conMetrics 
*base.ConnectionMetricsContext, p base.ConnectionProtocol) protocol.Metrics {
        metrics := 
conMetrics.GetMetrics(ListenerName).(*protocols.ProtocolMetrics)
-       return metrics.GetProtocolMetrics(protocolName)
+       return metrics.GetProtocolMetrics(p)
 }
 
 func (l *Listener) generateCachedConnectionKey(conID, randomID uint64) string {
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
new file mode 100644
index 0000000..0abc173
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -0,0 +1,229 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+       "context"
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-rover/pkg/logger"
+       profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+       
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+)
+
+const (
+       batchReadMinCount = 1000
+)
+
+var log = logger.GetLogger("profiling", "task", "network", "layer7", 
"protocols", "base")
+
+// ProtocolAnalyzer handler all socket data for each protocol
+type ProtocolAnalyzer struct {
+       protocolContext Context
+       protocol        Protocol
+       config          *profiling.TaskConfig
+
+       connections       map[connectionKey]*connectionInfo
+       analyzeLocker     sync.Mutex
+       receiveEventCount int
+}
+
+func NewProtocolAnalyzer(protocolContext Context, p Protocol, config 
*profiling.TaskConfig) *ProtocolAnalyzer {
+       return &ProtocolAnalyzer{
+               protocolContext: protocolContext,
+               protocol:        p,
+               config:          config,
+               connections:     make(map[connectionKey]*connectionInfo),
+       }
+}
+
+func (a *ProtocolAnalyzer) Start(ctx context.Context) {
+       duration, _ := time.ParseDuration(a.config.Network.ReportInterval)
+       timeTicker := time.NewTicker(duration)
+       go func() {
+               for {
+                       select {
+                       case <-timeTicker.C:
+                               // process event with interval
+                               a.processEvents()
+                       case <-ctx.Done():
+                               timeTicker.Stop()
+                               return
+                       }
+               }
+       }()
+
+       // if the protocol defined the events expire time, then check events 
interval
+       expireDuration := a.protocol.PackageMaxExpireDuration()
+       if expireDuration.Milliseconds() > 0 {
+               expireTicker := time.NewTicker(expireDuration)
+               go func() {
+                       for {
+                               select {
+                               case <-expireTicker.C:
+                                       a.processExpireEvents(expireDuration)
+                               case <-ctx.Done():
+                                       expireTicker.Stop()
+                                       return
+                               }
+                       }
+               }()
+       }
+}
+
+func (a *ProtocolAnalyzer) ReceiveSocketData(ctx Context, event 
*SocketDataUploadEvent) {
+       connectionID := event.GenerateConnectionID()
+       key := connectionKey{connectionID: event.ConnectionID, randomID: 
event.RandomID}
+       connection := a.connections[key]
+       if connection == nil {
+               connection = newConnectionInfo(a.protocol, ctx, 
key.connectionID, key.randomID)
+               a.connections[key] = connection
+       }
+       connection.checkConnectionMetrics(ctx)
+
+       log.Debugf("receive connection: %s, dataid: %d, sequence: %d, finished: 
%d, have reduce after chunk: %t, direction: %s, size: %d, total size: %d",
+               connectionID, event.DataID(), event.DataSequence(), 
event.Finished, event.HaveReduceDataAfterChunk(),
+               event.Direction().String(), event.DataLen, event.TotalSize0)
+
+       // insert to the event list
+       connection.buffer.appendEvent(event)
+
+       // process the events if reach the receiver counter
+       a.receiveEventCount++
+       if a.receiveEventCount >= batchReadMinCount {
+               a.processEvents()
+       }
+       a.receiveEventCount = 0
+}
+
+// processEvents means analyze the protocol in each connection
+func (a *ProtocolAnalyzer) processEvents() {
+       // it could be triggered by interval or reach counter
+       // if any trigger bean locked, the other one just ignore process
+       if !a.analyzeLocker.TryLock() {
+               return
+       }
+       defer a.analyzeLocker.Unlock()
+
+       for _, connection := range a.connections {
+               a.processConnectionEvents(connection)
+       }
+}
+
+// processExpireEvents delete the expired events
+func (a *ProtocolAnalyzer) processExpireEvents(expireDuration time.Duration) {
+       // the expiry must be mutual exclusion with events processor
+       a.analyzeLocker.Lock()
+       defer a.analyzeLocker.Unlock()
+
+       for _, connection := range a.connections {
+               a.processConnectionExpireEvents(connection, expireDuration)
+       }
+}
+
+func (a *ProtocolAnalyzer) processConnectionEvents(connection *connectionInfo) 
{
+       // reset the status for prepare reading
+       buffer := connection.buffer
+       metrics := connection.metrics
+       connectionID := connection.connectionID
+       buffer.resetForLoopReading()
+       // loop to read the protocol data
+       for {
+               // reset the status of reading
+               if !buffer.prepareForReading() {
+                       log.Debugf("prepare finsihed: event size: %d", 
buffer.events.Len())
+                       return
+               }
+
+               result := a.protocol.ParseProtocol(connectionID, metrics, 
buffer)
+               finishReading := false
+               switch result {
+               case ParseResultSuccess:
+                       finishReading = buffer.removeReadElements()
+               case ParseResultSkipPackage:
+                       finishReading = buffer.skipCurrentElement()
+               }
+
+               if finishReading {
+                       log.Debugf("reading finsihed: event size: %d", 
buffer.events.Len())
+                       break
+               }
+       }
+}
+
+func (a *ProtocolAnalyzer) processConnectionExpireEvents(connection 
*connectionInfo, expireDuration time.Duration) {
+       if c := connection.buffer.deleteExpireEvents(expireDuration); c > 0 {
+               log.Debugf("total removed %d expired events for %s protocol", 
c, a.protocol.Protocol().String())
+       }
+}
+
+func (a *ProtocolAnalyzer) UpdateExtensionConfig(config 
*profiling.ExtensionConfig) {
+       a.protocol.UpdateExtensionConfig(config)
+}
+
+type connectionKey struct {
+       connectionID uint64
+       randomID     uint64
+}
+
+type connectionInfo struct {
+       connectionID, randomID uint64
+       connectionProtocol     base.ConnectionProtocol
+       buffer                 *Buffer
+       metrics                Metrics
+       metricsFromConnection  bool
+}
+
+func newConnectionInfo(p Protocol, connectionContext Context, connectionID, 
randomID uint64) *connectionInfo {
+       fromConnection := false
+       var connectionMetrics Metrics
+       con := connectionContext.QueryConnection(connectionID, randomID)
+       // if connection not exists, then cached it into the analyzer context
+       if con == nil {
+               connectionMetrics = p.GenerateMetrics()
+       } else {
+               connectionMetrics = 
connectionContext.QueryProtocolMetrics(con.Metrics, p.Protocol())
+               fromConnection = true
+       }
+
+       return &connectionInfo{
+               connectionID:          connectionID,
+               randomID:              randomID,
+               connectionProtocol:    p.Protocol(),
+               buffer:                newBuffer(),
+               metrics:               connectionMetrics,
+               metricsFromConnection: fromConnection,
+       }
+}
+
+func (c *connectionInfo) checkConnectionMetrics(protocolContext Context) {
+       if c.metricsFromConnection {
+               return
+       }
+       connection := protocolContext.QueryConnection(c.connectionID, 
c.randomID)
+       if connection == nil {
+               return
+       }
+
+       // merge the temporary metrics into the connection metrics
+       connectionMetrics := 
protocolContext.QueryProtocolMetrics(connection.Metrics, c.connectionProtocol)
+       connectionMetrics.MergeMetricsFromConnection(connection, c.metrics)
+       c.metrics = connectionMetrics
+       c.metricsFromConnection = true
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
new file mode 100644
index 0000000..c7b2647
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
@@ -0,0 +1,414 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+       "container/list"
+       "errors"
+       "fmt"
+       "io"
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-rover/pkg/tools/host"
+)
+
+var (
+       ErrNotComplete = errors.New("socket: not complete event")
+)
+
+type Buffer struct {
+       events    *list.List
+       validated bool // the events list is validated or not
+
+       eventLocker sync.RWMutex
+
+       head    *BufferPosition
+       current *BufferPosition
+}
+
+type BufferPosition struct {
+       // element of the event list
+       element *list.Element
+       // bufIndex the buffer index of the element
+       bufIndex int
+}
+
+func (p *BufferPosition) String() string {
+       buffer := p.element.Value.(SocketDataBuffer)
+       return fmt.Sprintf("data id: %d, sequence: %d, buffer index: %d",
+               buffer.DataID(), buffer.DataSequence(), p.bufIndex)
+}
+
+func newBuffer() *Buffer {
+       return &Buffer{
+               events:    list.New(),
+               validated: false,
+       }
+}
+
+func (r *Buffer) Position() *BufferPosition {
+       return r.current.Clone()
+}
+
+func (r *Buffer) Slice(validated bool, start, end *BufferPosition) *Buffer {
+       events := list.New()
+       for nextElement := start.element; nextElement != end.element; 
nextElement = nextElement.Next() {
+               events.PushBack(nextElement.Value)
+       }
+       
events.PushBack(&SocketDataEventLimited{end.element.Value.(SocketDataBuffer), 
0, end.bufIndex})
+
+       return &Buffer{
+               events:    events,
+               validated: validated,
+               head:      &BufferPosition{element: events.Front(), bufIndex: 
start.bufIndex},
+               current:   &BufferPosition{element: events.Front(), bufIndex: 
start.bufIndex},
+       }
+}
+
+func (r *Buffer) Len() int {
+       if r.head == nil {
+               return 0
+       }
+       var result int
+       var startIndex = r.head.bufIndex
+       for e := r.head.element; e != nil; e = e.Next() {
+               result += r.head.element.Value.(SocketDataBuffer).BufferLen() - 
startIndex
+               startIndex = 0
+       }
+       return result
+}
+
+func (r *Buffer) FirstSocketBuffer() SocketDataBuffer {
+       if r.events.Len() == 0 {
+               return nil
+       }
+       return r.events.Front().Value.(SocketDataBuffer)
+}
+
+func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
+       if r.events.Len() == 0 {
+               return nil
+       }
+       return r.events.Back().Value.(SocketDataBuffer)
+}
+
+// DetectNotSendingLastPosition detect the buffer contains not sending data: 
the BPF limited socket data count
+func (r *Buffer) DetectNotSendingLastPosition() *BufferPosition {
+       if r.events.Len() == 0 {
+               return nil
+       }
+
+       for e := r.events.Front(); e != nil; e = e.Next() {
+               buf := e.Value.(SocketDataBuffer)
+               // the buffer is sent finished but still have reduced data not 
send
+               if buf.IsFinished() && buf.HaveReduceDataAfterChunk() {
+                       return &BufferPosition{element: e, bufIndex: 
buf.BufferLen()}
+               }
+       }
+       return nil
+}
+
+func CombineSlices(validated bool, buffers ...*Buffer) *Buffer {
+       if len(buffers) == 0 {
+               return nil
+       }
+       if len(buffers) == 1 {
+               return buffers[0]
+       }
+       events := list.New()
+       for _, b := range buffers {
+               if b.head.bufIndex > 0 {
+                       headBuffer := b.events.Front().Value.(SocketDataBuffer)
+                       events.PushBack(&SocketDataEventLimited{headBuffer, 
b.head.bufIndex, headBuffer.BufferLen()})
+                       for next := b.events.Front().Next(); next != nil; next 
= next.Next() {
+                               events.PushBack(next.Value)
+                       }
+               } else {
+                       events.PushBackList(b.events)
+               }
+       }
+       return &Buffer{
+               events:    events,
+               validated: validated,
+               head:      &BufferPosition{element: events.Front(), bufIndex: 
0},
+               current:   &BufferPosition{element: events.Front(), bufIndex: 
0},
+       }
+}
+
+func (r *Buffer) Peek(p []byte) (n int, err error) {
+       // save the index temporary
+       tmpPosition := r.current.Clone()
+       // restore the index
+       defer func() {
+               r.current = tmpPosition
+       }()
+       readIndex := 0
+       for readIndex < len(p) {
+               count, err := r.Read(p[readIndex:])
+               if err != nil {
+                       return 0, err
+               }
+               readIndex += count
+       }
+       return readIndex, nil
+}
+
+func (r *Buffer) OffsetPosition(offset int) *BufferPosition {
+       var nextElement func(e *list.Element) *list.Element
+       if offset == 0 {
+               return r.current.Clone()
+       } else if offset > 0 {
+               nextElement = func(e *list.Element) *list.Element {
+                       return e.Next()
+               }
+       } else {
+               nextElement = func(e *list.Element) *list.Element {
+                       return e.Prev()
+               }
+       }
+
+       var curEle = r.current.element
+       var curIndex = r.current.bufIndex
+       for ; curEle != nil; curEle = nextElement(curEle) {
+               nextOffset := curIndex + offset
+               bufferLen := curEle.Value.(SocketDataBuffer).BufferLen()
+               if nextOffset >= 0 && nextOffset < bufferLen {
+                       curIndex += offset
+                       break
+               }
+
+               if offset > 0 {
+                       offset -= bufferLen - curIndex
+                       curIndex = 0
+               } else {
+                       offset += curIndex
+                       next := nextElement(curEle)
+                       if next == nil {
+                               curEle = next
+                               break
+                       }
+                       curIndex = curEle.Value.(SocketDataBuffer).BufferLen()
+               }
+       }
+
+       if curEle == nil {
+               return nil
+       }
+       return &BufferPosition{element: curEle, bufIndex: curIndex}
+}
+
+func (r *Buffer) Read(p []byte) (n int, err error) {
+       if len(p) == 0 {
+               return 0, nil
+       }
+       if r.current == nil || r.current.element == nil {
+               return 0, io.EOF
+       }
+       element, n := r.readFromCurrent(p)
+       if n > 0 {
+               return n, nil
+       }
+
+       curEvent := element.Value.(SocketDataBuffer)
+       next := r.nextElement(element)
+       if next == nil {
+               return 0, io.EOF
+       }
+       nextEvent := next.Value.(SocketDataBuffer)
+
+       var shouldRead = false
+       if r.validated {
+               shouldRead = true
+               // same data id and sequence orders
+       } else if (curEvent.DataID() == nextEvent.DataID() && 
curEvent.DataSequence()+1 == nextEvent.DataSequence()) ||
+               // cur event is finished and next event is start
+               (nextEvent.IsStart() && curEvent.IsFinished()) ||
+               // same data id and sequence but have difference buffer index
+               (curEvent.DataID() == nextEvent.DataID() && 
curEvent.DataSequence() == nextEvent.DataSequence() &&
+                       r.current.bufIndex <= nextEvent.BufferStartPosition()) {
+               shouldRead = true
+       }
+
+       if !shouldRead {
+               return 0, ErrNotComplete
+       }
+
+       return r.read0(next, nextEvent, p)
+}
+
+func (r *Buffer) readFromCurrent(p []byte) (element *list.Element, n int) {
+       element = r.current.element
+       curEvent := element.Value.(SocketDataBuffer)
+       residueSize := curEvent.BufferLen() - r.current.bufIndex
+       if residueSize > 0 {
+               readLen := len(p)
+               if residueSize < readLen {
+                       readLen = residueSize
+               }
+
+               n = copy(p, 
curEvent.BufferData()[r.current.bufIndex:r.current.bufIndex+readLen])
+               r.current.bufIndex += n
+               return element, n
+       }
+       return element, 0
+}
+
+func (r *Buffer) read0(currentElement *list.Element, currentBuffer 
SocketDataBuffer, p []byte) (n int, err error) {
+       readLen := len(p)
+       if currentBuffer.BufferLen() < readLen {
+               readLen = currentBuffer.BufferLen()
+       }
+
+       copy(p, currentBuffer.BufferData()[:readLen])
+       r.current.element = currentElement
+       r.current.bufIndex = readLen
+       return readLen, nil
+}
+
+// IsCurrentPacketReadFinished means to validate the current reading package 
is reading finished
+func (r *Buffer) IsCurrentPacketReadFinished() bool {
+       return r.current.bufIndex == 
r.current.element.Value.(SocketDataBuffer).BufferLen()
+}
+
+func (r *Buffer) resetForLoopReading() {
+       r.head = nil
+       r.current = nil
+}
+
+func (r *Buffer) prepareForReading() bool {
+       if r.events.Len() == 0 {
+               return false
+       }
+       if r.head == nil || r.head.element == nil {
+               // read in the first element
+               r.eventLocker.RLock()
+               defer r.eventLocker.RUnlock()
+               r.head = &BufferPosition{element: r.events.Front(), bufIndex: 0}
+               r.current = r.head.Clone()
+       } else {
+               // make sure we can read from head
+               r.current = r.head.Clone()
+       }
+
+       return true
+}
+
+func (r *Buffer) removeReadElements() bool {
+       r.eventLocker.Lock()
+       defer r.eventLocker.Unlock()
+
+       // delete until to current position
+       next := r.head.element
+       for ; next != nil && next != r.current.element; next = 
r.removeElement0(next) {
+       }
+       if next != nil && next.Value.(SocketDataBuffer).BufferLen() == 
r.current.bufIndex {
+               // the last event already read finished, then delete it
+               r.head.element = r.removeElement0(next)
+               r.head.bufIndex = 0
+       } else if next != nil {
+               // keep using the latest element
+               r.head.element = next
+       } else {
+               return true
+       }
+       return false
+}
+
+// skipCurrentElement skip current element in reader, if return true means 
have read finished
+func (r *Buffer) skipCurrentElement() bool {
+       r.head.element = r.nextElement(r.current.element)
+       r.current.bufIndex = 0
+
+       return r.head.element == nil
+}
+
+func (r *Buffer) removeElement0(element *list.Element) *list.Element {
+       if element == nil {
+               return nil
+       }
+       result := element.Next()
+       r.events.Remove(element)
+       return result
+}
+
+// appendEvent insert the event to the event list following the order
+func (r *Buffer) appendEvent(event *SocketDataUploadEvent) {
+       r.eventLocker.Lock()
+       defer r.eventLocker.Unlock()
+
+       if r.events.Len() == 0 {
+               r.events.PushFront(event)
+               return
+       }
+       if r.events.Back().Value.(SocketDataBuffer).DataID() < event.DataID() {
+               r.events.PushBack(event)
+               return
+       }
+       beenAdded := false
+       for element := r.events.Front(); element != nil; element = 
element.Next() {
+               existEvent := element.Value.(SocketDataBuffer)
+               if existEvent.DataID() > event.DataID() {
+                       // data id needs order
+                       beenAdded = true
+               } else if existEvent.DataID() == event.DataID() && 
existEvent.DataSequence() > event.DataSequence() {
+                       // following the sequence order
+                       beenAdded = true
+               }
+               if beenAdded {
+                       r.events.InsertBefore(event, element)
+                       break
+               }
+       }
+       if !beenAdded {
+               r.events.PushBack(event)
+       }
+}
+
+func (r *Buffer) deleteExpireEvents(expireDuration time.Duration) int {
+       r.eventLocker.Lock()
+       defer r.eventLocker.Unlock()
+
+       expireTime := time.Now().Add(-expireDuration)
+       count := 0
+       for e := r.events.Front(); e != nil; {
+               startTime := host.Time(e.Value.(SocketDataBuffer).StartTime())
+               if expireTime.After(startTime) {
+                       count++
+                       cur := e
+                       e = e.Next()
+                       r.events.Remove(cur)
+               } else {
+                       break
+               }
+       }
+       return count
+}
+
+func (r *Buffer) nextElement(e *list.Element) *list.Element {
+       if e == nil {
+               return nil
+       }
+       r.eventLocker.RLock()
+       defer r.eventLocker.RUnlock()
+       return e.Next()
+}
+
+func (p *BufferPosition) Clone() *BufferPosition {
+       return &BufferPosition{element: p.element, bufIndex: p.bufIndex}
+}
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer_test.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer_test.go
new file mode 100644
index 0000000..87984b2
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer_test.go
@@ -0,0 +1,94 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+       "container/list"
+       "testing"
+)
+
+func TestOffsetPosition(t *testing.T) {
+       type position struct {
+               eventIndex  int
+               bufferIndex int
+       }
+       var tests = []struct {
+               events  []int
+               current position
+               offset  int
+               result  *position
+       }{
+               {
+                       events:  []int{10, 10, 10},
+                       current: position{0, 0},
+                       offset:  10,
+                       result:  &position{1, 0},
+               },
+               {
+                       events:  []int{10, 10, 10},
+                       current: position{1, 0},
+                       offset:  -10,
+                       result:  &position{0, 0},
+               },
+               {
+                       events:  []int{10, 10, 10},
+                       current: position{2, 5},
+                       offset:  -10,
+                       result:  &position{1, 5},
+               },
+               {
+                       events:  []int{10, 10, 10},
+                       current: position{2, 5},
+                       offset:  -20,
+                       result:  &position{0, 5},
+               },
+               {
+                       events:  []int{10, 10, 10},
+                       current: position{2, 5},
+                       offset:  10,
+                       result:  nil,
+               },
+       }
+
+       for _, test := range tests {
+               events := list.New()
+               buffer := Buffer{events: events}
+               var curElement *list.Element
+               for i, e := range test.events {
+                       element := events.PushBack(&SocketDataUploadEvent{
+                               DataID0: uint64(i),
+                               DataLen: uint16(e),
+                       })
+                       if i == test.current.eventIndex {
+                               curElement = element
+                       }
+               }
+
+               buffer.prepareForReading()
+               buffer.current = &BufferPosition{element: curElement, bufIndex: 
test.current.bufferIndex}
+               offsetPosition := buffer.OffsetPosition(test.offset)
+               if offsetPosition == nil && test.result == nil {
+                       continue
+               }
+               if 
int(offsetPosition.element.Value.(*SocketDataUploadEvent).DataID()) != 
test.result.eventIndex ||
+                       offsetPosition.bufIndex != test.result.bufferIndex {
+                       t.Fatalf("excepted: %d,%d, actual: %d,%d", 
test.result.eventIndex, test.result.bufferIndex,
+                               
offsetPosition.element.Value.(*SocketDataUploadEvent).DataID(), 
offsetPosition.bufIndex)
+               }
+       }
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
index e3f09f0..52f886d 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
@@ -24,35 +24,45 @@ import (
 )
 
 type SocketDataBuffer interface {
-       // Combine from other buffer
-       Combine(buffered SocketDataBuffer) SocketDataBuffer
        // BufferData of the buffer
        BufferData() []byte
        // TotalSize of socket data, the data may exceed the size of the 
BufferData()
        TotalSize() uint64
        // Direction of the data, send or receive
        Direction() base.SocketDataDirection
-       FirstEvent() *SocketDataUploadEvent
-
+       // BufferStartPosition the buffer start index
+       BufferStartPosition() int
+       // BufferLen the buffer data length
+       BufferLen() int
+       // DataID data id of the buffer
+       DataID() uint64
+       // DataSequence the data sequence under same data id
+       DataSequence() int
+       // IsStart this buffer is start of the same data id
+       IsStart() bool
+       // IsFinished this buffer is finish of the same data id
+       IsFinished() bool
+       // HaveReduceDataAfterChunk check have reduced data after current buffer
+       HaveReduceDataAfterChunk() bool
+
+       // StartTime the data start timestamp
        StartTime() uint64
+       // EndTime the data end timestamp
        EndTime() uint64
-
-       MinDataID() int
-       MaxDataID() int
 }
 
 type SocketDataUploadEvent struct {
        Protocol     base.ConnectionProtocol
-       MsgType      base.SocketMessageType
+       HaveReduce   uint8
        Direction0   base.SocketDataDirection
        Finished     uint8
-       Sequence     uint16
+       Sequence0    uint16
        DataLen      uint16
        StartTime0   uint64
        EndTime0     uint64
        ConnectionID uint64
        RandomID     uint64
-       DataID       uint64
+       DataID0      uint64
        TotalSize0   uint64
        Buffer       [2048]byte
 }
@@ -65,6 +75,10 @@ func (s *SocketDataUploadEvent) BufferData() []byte {
        return s.Buffer[:s.DataLen]
 }
 
+func (s *SocketDataUploadEvent) BufferLen() int {
+       return int(s.DataLen)
+}
+
 func (s *SocketDataUploadEvent) StartTime() uint64 {
        return s.StartTime0
 }
@@ -77,90 +91,48 @@ func (s *SocketDataUploadEvent) Direction() 
base.SocketDataDirection {
        return s.Direction0
 }
 
-func (s *SocketDataUploadEvent) FirstEvent() *SocketDataUploadEvent {
-       return s
-}
-
-func (s *SocketDataUploadEvent) MinDataID() int {
-       return int(s.DataID)
-}
-
-func (s *SocketDataUploadEvent) MaxDataID() int {
-       return int(s.DataID)
-}
-
 func (s *SocketDataUploadEvent) IsStart() bool {
-       return s.Sequence == 0
+       return s.Sequence0 == 0
 }
 
 func (s *SocketDataUploadEvent) IsFinished() bool {
        return s.Finished == 1
 }
 
-func (s *SocketDataUploadEvent) Combine(other SocketDataBuffer) 
SocketDataBuffer {
-       combined := &SocketDataUploadCombinedEvent{first: s}
-       combined.realBuffer = append(s.Buffer[:s.DataLen], 
other.BufferData()...)
-       combined.minDataID = int(s.DataID)
-       if other.MinDataID() < combined.minDataID {
-               combined.minDataID = other.MinDataID()
-       }
-       combined.maxDataID = int(s.DataID)
-       if other.MaxDataID() > combined.maxDataID {
-               combined.maxDataID = other.MaxDataID()
-       }
-       return combined
+func (s *SocketDataUploadEvent) DataID() uint64 {
+       return s.DataID0
 }
 
-func (s *SocketDataUploadEvent) TotalSize() uint64 {
-       return s.TotalSize0
-}
-
-type SocketDataUploadCombinedEvent struct {
-       first      *SocketDataUploadEvent
-       realBuffer []byte
-       minDataID  int
-       maxDataID  int
-}
-
-func (s *SocketDataUploadCombinedEvent) BufferData() []byte {
-       return s.realBuffer
-}
-
-func (s *SocketDataUploadCombinedEvent) TotalSize() uint64 {
-       return s.first.TotalSize0
+func (s *SocketDataUploadEvent) DataSequence() int {
+       return int(s.Sequence0)
 }
 
-func (s *SocketDataUploadCombinedEvent) StartTime() uint64 {
-       return s.first.StartTime0
+func (s *SocketDataUploadEvent) BufferStartPosition() int {
+       return 0
 }
 
-func (s *SocketDataUploadCombinedEvent) EndTime() uint64 {
-       return s.first.EndTime0
+func (s *SocketDataUploadEvent) TotalSize() uint64 {
+       return s.TotalSize0
 }
 
-func (s *SocketDataUploadCombinedEvent) MinDataID() int {
-       return s.minDataID
+func (s *SocketDataUploadEvent) HaveReduceDataAfterChunk() bool {
+       return s.HaveReduce == 1
 }
 
-func (s *SocketDataUploadCombinedEvent) MaxDataID() int {
-       return s.maxDataID
+type SocketDataEventLimited struct {
+       SocketDataBuffer
+       from int
+       size int
 }
 
-func (s *SocketDataUploadCombinedEvent) Direction() base.SocketDataDirection {
-       return s.first.Direction0
+func (s *SocketDataEventLimited) BufferData() []byte {
+       return s.SocketDataBuffer.BufferData()[s.from:s.size]
 }
 
-func (s *SocketDataUploadCombinedEvent) FirstEvent() *SocketDataUploadEvent {
-       return s.first
+func (s *SocketDataEventLimited) BufferLen() int {
+       return s.size - s.from
 }
 
-func (s *SocketDataUploadCombinedEvent) Combine(other SocketDataBuffer) 
SocketDataBuffer {
-       s.realBuffer = append(s.realBuffer, other.BufferData()...)
-       if other.MinDataID() < s.minDataID {
-               s.minDataID = other.MinDataID()
-       }
-       if other.MaxDataID() > s.maxDataID {
-               s.maxDataID = other.MaxDataID()
-       }
-       return s
+func (s *SocketDataEventLimited) BufferStartPosition() int {
+       return s.from
 }
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
index 8afb92e..af401e1 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
@@ -18,22 +18,32 @@
 package base
 
 import (
+       "time"
+
        profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
        
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
 )
 
+type ParseResult int
+
+const (
+       ParseResultSuccess ParseResult = iota
+       ParseResultSkipPackage
+)
+
 type Protocol interface {
-       Name() string
+       Protocol() base.ConnectionProtocol
        GenerateMetrics() Metrics
        Init(config *profiling.TaskConfig)
 
-       ReceiveData(context Context, event *SocketDataUploadEvent) bool
+       ParseProtocol(connectionID uint64, metrics Metrics, reader *Buffer) 
ParseResult
+       PackageMaxExpireDuration() time.Duration
        UpdateExtensionConfig(config *profiling.ExtensionConfig)
 }
 
 type Context interface {
        QueryConnection(connectionID, randomID uint64) *base.ConnectionContext
-       QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, 
protocolName string) Metrics
+       QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, 
protocol base.ConnectionProtocol) Metrics
 }
 
 type Metrics interface {
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
index 499f803..c9b45fb 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
@@ -18,30 +18,22 @@
 package http1
 
 import (
-       "bufio"
-       "bytes"
        "container/list"
        "encoding/json"
-       "fmt"
-       "io"
-       "net/http"
-       "net/textproto"
-       "strconv"
-       "strings"
        "sync"
+       "time"
 
        "github.com/apache/skywalking-rover/pkg/logger"
        profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
        
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
        protocol 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+       
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader"
 
        "github.com/sirupsen/logrus"
 )
 
 var log = logger.GetLogger("profiling", "task", "network", "layer7", 
"protocols", "http1")
 
-var ProtocolName = "http1"
-
 var PackageSizeHistogramBuckets = []float64{
        // 0.25KB, 0.5KB, 1KB, 1.5KB, 2KB, 3KB, 5KB, 8KB, 10KB, 15KB, 20KB, 
35KB, 50KB, 75KB, 100KB, 200KB, 500KB
        256, 512, 1048, 1536, 2048, 3072, 5120, 8192, 10240, 15360, 20480, 
35840, 51200, 76800, 102400, 204800, 512000,
@@ -77,8 +69,8 @@ func NewHTTP1Analyzer() protocol.Protocol {
        }
 }
 
-func (h *Analyzer) Name() string {
-       return ProtocolName
+func (h *Analyzer) Protocol() base.ConnectionProtocol {
+       return base.ConnectionProtocolHTTP
 }
 
 func (h *Analyzer) GenerateMetrics() protocol.Metrics {
@@ -94,260 +86,146 @@ func (h *Analyzer) Init(config *profiling.TaskConfig) {
        h.sampleConfig = NewSamplingConfig(config)
 }
 
-func (h *Analyzer) ReceiveData(context protocol.Context, event 
*protocol.SocketDataUploadEvent) bool {
-       // only handle the HTTP1 protocol
-       if event.Protocol != base.ConnectionProtocolHTTP {
-               return false
-       }
-
-       connectionID := event.GenerateConnectionID()
-       fromAnalyzerCache := false
-       var connectionMetrics *ConnectionMetrics
-       connection := context.QueryConnection(event.ConnectionID, 
event.RandomID)
-       // if connection not exists, then cached it into the analyzer context
-       if connection == nil {
-               connectionMetrics = h.cache[connectionID]
-               fromAnalyzerCache = true
-               if connectionMetrics == nil {
-                       connectionMetrics = 
h.GenerateMetrics().(*ConnectionMetrics)
-                       h.cache[connectionID] = connectionMetrics
-               }
-       } else {
-               connectionMetrics = 
context.QueryProtocolMetrics(connection.Metrics, 
ProtocolName).(*ConnectionMetrics)
-       }
-
-       log.Debugf("receive connection: %s, dataid: %d, sequence: %d, finished: 
%d, message type: %s, direction: %s, size: %d, total size: %d",
-               connectionID, event.DataID, event.Sequence, event.Finished, 
event.MsgType.String(), event.Direction().String(), event.DataLen, 
event.TotalSize0)
-       // if the cache is existing in the analyzer context, then delete it
-       if !fromAnalyzerCache {
-               if tmp := h.cache[connectionID]; tmp != nil {
-                       connectionMetrics.MergeFrom(h, tmp)
-                       delete(h.cache, connectionID)
-               }
+func (h *Analyzer) ParseProtocol(connectionID uint64, metrics 
protocol.Metrics, buf *protocol.Buffer) protocol.ParseResult {
+       connectionMetrics := metrics.(*ConnectionMetrics)
+       messageType, err := reader.IdentityMessageType(buf)
+       if err != nil {
+               return protocol.ParseResultSkipPackage
        }
 
-       req, resp := h.buildHTTP1(connectionMetrics.halfData, event)
-       if req != nil && resp != nil {
-               if err := h.analyze(context, connectionID, connectionMetrics, 
req, resp); err != nil {
-                       log.Errorf("HTTP1 analyze failure: %v", err)
-                       return false
-               }
-       } else {
-               log.Debugf("connnection: %s, remaining half data list size: 
%d", connectionID, connectionMetrics.halfData.Len())
+       var result protocol.ParseResult
+       switch messageType {
+       case reader.MessageTypeRequest:
+               result, err = h.handleRequest(connectionMetrics, buf)
+       case reader.MessageTypeResponse:
+               result, err = h.handleResponse(connectionID, connectionMetrics, 
buf)
+       case reader.MessageTypeUnknown:
+               return protocol.ParseResultSkipPackage
        }
-       return true
-}
 
-func (h *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
-       if config == nil {
-               return
+       if err != nil {
+               log.Warnf("reading %v error: %v", messageType, err)
+               return protocol.ParseResultSkipPackage
+       } else if result != protocol.ParseResultSuccess {
+               return result
        }
-       h.sampleConfig.UpdateRules(config.NetworkSamplings)
+       return protocol.ParseResultSuccess
 }
 
-func (h *Analyzer) combineAndRemoveEvent(halfConnections *list.List, 
firstElement *list.Element,
-       lastAppender protocol.SocketDataBuffer) protocol.SocketDataBuffer {
-       firstEvent := firstElement.Value.(*protocol.SocketDataUploadEvent)
-       if firstEvent.Sequence == 0 && firstEvent.Finished == 1 {
-               halfConnections.Remove(firstElement)
-               return h.combineEventIfNeed(firstEvent, lastAppender)
+func (h *Analyzer) handleRequest(metrics *ConnectionMetrics, buf 
*protocol.Buffer) (protocol.ParseResult, error) {
+       // parsing request
+       req, r, err := reader.ReadRequest(buf)
+       if err != nil {
+               return protocol.ParseResultSkipPackage, err
        }
-       next := firstElement.Next()
-       halfConnections.Remove(firstElement)
-       var buffer protocol.SocketDataBuffer = firstEvent
-       // for-each the events until buffer finished
-       for next != nil {
-               event := next.Value.(*protocol.SocketDataUploadEvent)
-
-               buffer = buffer.Combine(event)
-
-               tmp := next.Next()
-               halfConnections.Remove(next)
-               next = tmp
-               // combine event
-               if event.Finished == 1 {
-                       return h.combineEventIfNeed(buffer, lastAppender)
-               }
+       if r != protocol.ParseResultSuccess {
+               return r, nil
        }
-       return h.combineEventIfNeed(buffer, lastAppender)
-}
 
-func (h *Analyzer) combineEventIfNeed(data, appender 
protocol.SocketDataBuffer) protocol.SocketDataBuffer {
-       if appender != nil {
-               return data.Combine(appender)
-       }
-       return data
+       metrics.AppendRequestToList(req)
+       return protocol.ParseResultSuccess, nil
 }
 
-func (h *Analyzer) buildHTTP1(halfConnections *list.List, event 
*protocol.SocketDataUploadEvent) (request, response protocol.SocketDataBuffer) {
-       // no connections, then just add the response to the half connections 
to wait the request
-       if halfConnections.Len() == 0 {
-               halfConnections.PushBack(event)
-               return nil, nil
+func (h *Analyzer) handleResponse(connectionID uint64, metrics 
*ConnectionMetrics, buf *protocol.Buffer) (protocol.ParseResult, error) {
+       // find the first request
+       firstElement := metrics.halfData.Front()
+       if firstElement == nil {
+               return protocol.ParseResultSkipPackage, nil
        }
+       request := metrics.halfData.Remove(firstElement).(*reader.Request)
 
-       // quick handler(only one element, and is request)
-       if halfConnections.Len() == 1 {
-               firstElement := halfConnections.Front()
-               firstEvent := 
firstElement.Value.(*protocol.SocketDataUploadEvent)
-               if firstEvent.IsStart() && firstEvent.Finished == 1 && 
event.IsStart() && event.Finished == 1 &&
-                       firstEvent.DataID+1 == event.DataID && 
firstEvent.MsgType == base.SocketMessageTypeRequest &&
-                       event.MsgType == base.SocketMessageTypeResponse {
-                       return h.combineAndRemoveEvent(halfConnections, 
firstElement, nil), event
-               }
-       }
-
-       // push to the queue
-       h.insertToList(halfConnections, event)
-
-       // trying to find completed request and response
-       return NewHTTP1BufferAnalyzer(h).Analyze(halfConnections)
-}
-
-func (h *Analyzer) insertToList(halfConnections *list.List, event 
*protocol.SocketDataUploadEvent) {
-       if halfConnections.Len() == 0 {
-               halfConnections.PushFront(event)
-               return
-       }
-       if 
halfConnections.Back().Value.(*protocol.SocketDataUploadEvent).DataID < 
event.DataID {
-               halfConnections.PushBack(event)
-               return
-       }
-       beenAdded := false
-       for element := halfConnections.Front(); element != nil; element = 
element.Next() {
-               existEvent := element.Value.(*protocol.SocketDataUploadEvent)
-               if existEvent.DataID > event.DataID {
-                       // data id needs order
-                       beenAdded = true
-               } else if existEvent.DataID == event.DataID {
-                       if existEvent.MsgType == event.MsgType && 
existEvent.Sequence > event.Sequence {
-                               // same message type and following the sequence 
order
-                               beenAdded = true
-                       } else if existEvent.MsgType > event.MsgType {
-                               // request needs before response
-                               beenAdded = true
-                       }
-               }
-               if beenAdded {
-                       halfConnections.InsertBefore(event, element)
-                       break
-               }
-       }
-       if !beenAdded {
-               halfConnections.PushBack(event)
-       }
-}
-
-func (h *Analyzer) analyze(_ protocol.Context, connectionID string, 
connectionMetrics *ConnectionMetrics,
-       requestBuffer, responseBuffer protocol.SocketDataBuffer) error {
-       request, err := 
http.ReadRequest(bufio.NewReader(bytes.NewBuffer(requestBuffer.BufferData())))
-       if err != nil {
-               return fmt.Errorf("parse request failure: data length: %d, 
total data length: %d, %v",
-                       len(requestBuffer.BufferData()), 
requestBuffer.TotalSize(), err)
-       }
-
-       response, err := 
http.ReadResponse(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())),
 request)
-       if response != nil {
-               defer response.Body.Close()
-       }
+       // parsing request
+       response, r, err := reader.ReadResponse(request, buf)
        if err != nil {
-               if err == io.ErrUnexpectedEOF || err == io.EOF {
-                       response, err = 
h.tryingToReadResponseWithoutHeaders(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())),
 request)
-                       if err != nil {
-                               return fmt.Errorf("parsing simple data error: 
%v", err)
-                       }
-                       if response != nil && response.Body != nil {
-                               defer response.Body.Close()
-                       }
-               }
-               if err != nil {
-                       return fmt.Errorf("parse response failure, data length: 
%d, total data length: %d, %v",
-                               len(requestBuffer.BufferData()), 
requestBuffer.TotalSize(), err)
-               }
+               return protocol.ParseResultSkipPackage, err
+       } else if r != protocol.ParseResultSuccess {
+               return r, nil
        }
 
        // lock append metrics with read locker
-       connectionMetrics.metricsLocker.RLock()
-       defer connectionMetrics.metricsLocker.RUnlock()
+       metrics.metricsLocker.RLock()
+       defer metrics.metricsLocker.RUnlock()
 
        // append metrics
-       data := connectionMetrics.clientMetrics
+       data := metrics.clientMetrics
        side := base.ConnectionRoleClient
-       if requestBuffer.Direction() == base.SocketDataDirectionIngress {
+       if request.Direction() == base.SocketDataDirectionIngress {
                // if receive the request, that's mean is server side
-               data = connectionMetrics.serverMetrics
+               data = metrics.serverMetrics
                side = base.ConnectionRoleServer
        }
-       data.Append(h.sampleConfig, request, requestBuffer, response, 
responseBuffer)
+       data.Append(h.sampleConfig, request, response)
 
        if log.Enable(logrus.DebugLevel) {
                metricsJSON, _ := json.Marshal(data)
-               log.Debugf("generated metrics, connection id: %s, side: %s, 
metrisc: %s", connectionID, side.String(), string(metricsJSON))
+               log.Debugf("generated metrics, connection id: %d, side: %s, 
metrisc: %s", connectionID, side.String(), string(metricsJSON))
        }
-       return nil
+       return protocol.ParseResultSuccess, nil
 }
 
-func (h *Analyzer) tryingToReadResponseWithoutHeaders(reader *bufio.Reader, 
request *http.Request) (*http.Response, error) {
-       if reader.Size() < 16 {
-               return nil, fmt.Errorf("the header length not enough")
-       }
-       tp := textproto.NewReader(reader)
-       resp := &http.Response{
-               Request: request,
+func (h *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
+       if config == nil {
+               return
        }
+       h.sampleConfig.UpdateRules(config.NetworkSamplings)
+}
 
-       line, err := tp.ReadLine()
-       if err != nil {
-               return nil, fmt.Errorf("read response first line failure: %v", 
err)
-       }
-       indexByte := strings.IndexByte(line, ' ')
-       if indexByte == -1 {
-               return nil, fmt.Errorf("parsing response error: %s", line)
-       }
-       resp.Proto = line[:indexByte]
-       resp.Status = strings.TrimLeft(line[indexByte+1:], " ")
-       statusCode := resp.Status
-       if i := strings.IndexByte(resp.Status, ' '); i != -1 {
-               statusCode = resp.Status[:i]
+func (h *Analyzer) PackageMaxExpireDuration() time.Duration {
+       return time.Minute
+}
+
+func (m *ConnectionMetrics) AppendRequestToList(req *reader.Request) {
+       if m.halfData.Len() == 0 {
+               m.halfData.PushFront(req)
+               return
        }
-       if len(statusCode) != 3 {
-               return nil, fmt.Errorf("parsing response status code failure: 
%v", statusCode)
+       if m.halfData.Back().Value.(*reader.Request).MinDataID() < 
req.MinDataID() {
+               m.halfData.PushBack(req)
+               return
        }
-       resp.StatusCode, err = strconv.Atoi(statusCode)
-       if err != nil || resp.StatusCode < 0 {
-               return nil, fmt.Errorf("status code not correct: %s", 
statusCode)
+       beenAdded := false
+       for element := m.halfData.Front(); element != nil; element = 
element.Next() {
+               existEvent := element.Value.(*reader.Request)
+               if existEvent.MinDataID() > req.MinDataID() {
+                       m.halfData.InsertBefore(req, element)
+                       beenAdded = true
+                       break
+               }
        }
-       var ok bool
-       if resp.ProtoMajor, resp.ProtoMinor, ok = 
http.ParseHTTPVersion(resp.Proto); !ok {
-               return nil, fmt.Errorf("parsing http version failure: %s", 
resp.Proto)
+       if !beenAdded {
+               m.halfData.PushBack(req)
        }
-
-       return resp, nil
 }
 
-func (h *ConnectionMetrics) MergeMetricsFromConnection(connection 
*base.ConnectionContext, data base.ConnectionMetrics) {
+func (m *ConnectionMetrics) MergeMetricsFromConnection(connection 
*base.ConnectionContext, data base.ConnectionMetrics) {
        other := data.(*ConnectionMetrics)
        other.metricsLocker.Lock()
        defer other.metricsLocker.Unlock()
 
-       h.clientMetrics.MergeAndClean(other.clientMetrics)
-       h.serverMetrics.MergeAndClean(other.serverMetrics)
+       if other.halfData != nil {
+               for element := other.halfData.Front(); element != nil; element 
= element.Next() {
+                       m.AppendRequestToList(element.Value.(*reader.Request))
+               }
+       }
+
+       m.clientMetrics.MergeAndClean(other.clientMetrics)
+       m.serverMetrics.MergeAndClean(other.serverMetrics)
        if log.Enable(logrus.DebugLevel) {
-               clientMetrics, _ := json.Marshal(h.clientMetrics)
-               serverMetrics, _ := json.Marshal(h.serverMetrics)
+               clientMetrics, _ := json.Marshal(m.clientMetrics)
+               serverMetrics, _ := json.Marshal(m.serverMetrics)
                log.Debugf("combine metrics: conid: %d_%d, client side metrics: 
%s, server side metrics: %s",
                        connection.ConnectionID, connection.RandomID, 
clientMetrics, serverMetrics)
        }
 }
 
-func (h *ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, 
metricsBuilder *base.MetricsBuilder) {
+func (m *ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, 
metricsBuilder *base.MetricsBuilder) {
        for _, p := range traffic.LocalProcesses {
                // if the remote process is profiling, then used the client side
-               localMetrics := h.clientMetrics
-               remoteMetrics := h.serverMetrics
+               localMetrics := m.clientMetrics
+               remoteMetrics := m.serverMetrics
                if traffic.Role == base.ConnectionRoleServer {
-                       localMetrics = h.serverMetrics
-                       remoteMetrics = h.clientMetrics
+                       localMetrics = m.serverMetrics
+                       remoteMetrics = m.clientMetrics
                }
 
                metricsCount := localMetrics.appendMetrics(traffic, p, "", 
metricsBuilder, false)
@@ -362,15 +240,7 @@ func (h *ConnectionMetrics) FlushMetrics(traffic 
*base.ProcessTraffic, metricsBu
                        // if remote process is profiling, then the metrics 
data need to be cut half
                        log.Debugf("flush HTTP1 metrics(%s): %s, remote process 
is profiling: %t, client(%s), server(%s)",
                                traffic.Role.String(), 
traffic.GenerateConnectionInfo(), traffic.RemoteProcessIsProfiling(),
-                               h.clientMetrics.String(), 
h.serverMetrics.String())
-               }
-       }
-}
-
-func (h *ConnectionMetrics) MergeFrom(analyzer *Analyzer, other 
*ConnectionMetrics) {
-       if other.halfData != nil {
-               for element := other.halfData.Front(); element != nil; element 
= element.Next() {
-                       analyzer.insertToList(h.halfData, 
element.Value.(*protocol.SocketDataUploadEvent))
+                               m.clientMetrics.String(), 
m.serverMetrics.String())
                }
        }
 }
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go
deleted file mode 100644
index a663a6e..0000000
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go
+++ /dev/null
@@ -1,310 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package http1
-
-import (
-       "bufio"
-       "container/list"
-       "net/http"
-       "reflect"
-       "strings"
-       "testing"
-
-       base2 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
-
-       
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
-)
-
-var unknown, request, response = 0, 1, 2
-var finished, notFinished = 1, 0
-
-// nolint
-func TestBuildHTTP1(t *testing.T) {
-       tests := []struct {
-               name   string
-               events []struct {
-                       dataID   int
-                       dataType int
-                       sequence int
-                       finished int
-                       data     string
-               }
-               http []struct {
-                       start int
-                       end   int
-               }
-               residueID []int
-       }{
-               {
-                       name: "simple",
-                       events: []struct {
-                               dataID   int
-                               dataType int
-                               sequence int
-                               finished int
-                               data     string
-                       }{
-                               {1, request, 0, notFinished, ""},
-                               {1, request, 1, notFinished, ""},
-                               {1, request, 2, finished, ""},
-                               {2, response, 0, notFinished, ""},
-                               {2, response, 1, finished, ""},
-
-                               {3, request, 0, finished, ""},
-                               {4, response, 0, finished, ""},
-                       },
-                       http: []struct {
-                               start int
-                               end   int
-                       }{
-                               {1, 2},
-                               {3, 4},
-                       },
-                       residueID: []int{},
-               },
-               {
-                       name: "response before request",
-                       events: []struct {
-                               dataID   int
-                               dataType int
-                               sequence int
-                               finished int
-                               data     string
-                       }{
-                               {2, response, 0, finished, ""},
-                               {1, request, 1, finished, ""},
-                               {1, request, 0, notFinished, ""},
-
-                               {3, request, 0, notFinished, ""},
-                               {3, request, 1, notFinished, ""},
-                               {4, response, 1, finished, ""},
-                               {4, response, 0, notFinished, ""},
-                               {3, request, 2, finished, ""},
-
-                               {5, request, 0, notFinished, ""},
-                               {5, request, 1, notFinished, ""},
-                               {6, response, 1, finished, ""},
-                               {6, response, 0, notFinished, ""},
-                               {5, request, 2, finished, ""},
-                       },
-                       http: []struct {
-                               start int
-                               end   int
-                       }{
-                               {1, 2},
-                               {3, 4},
-                               {5, 6},
-                       },
-                       residueID: []int{},
-               },
-               {
-                       name: "residue requests",
-                       events: []struct {
-                               dataID   int
-                               dataType int
-                               sequence int
-                               finished int
-                               data     string
-                       }{
-                               {1, request, 0, finished, ""},
-                               {2, response, 1, finished, ""},
-                               {2, response, 0, notFinished, ""},
-
-                               {3, request, 0, finished, ""},
-                               {4, response, 0, notFinished, ""},
-
-                               {5, request, 1, finished, ""},
-                               {6, response, 0, finished, ""},
-                       },
-                       http: []struct {
-                               start int
-                               end   int
-                       }{
-                               {1, 2},
-                       },
-                       residueID: []int{3, 4, 5, 6},
-               },
-               {
-                       name: "multiple request",
-                       events: []struct {
-                               dataID   int
-                               dataType int
-                               sequence int
-                               finished int
-                               data     string
-                       }{
-                               {1, request, 0, finished, ""},
-                               {2, request, 0, finished, ""},
-                       },
-                       http: []struct {
-                               start int
-                               end   int
-                       }{},
-                       residueID: []int{1, 2},
-               },
-               {
-                       name: "multiple response",
-                       events: []struct {
-                               dataID   int
-                               dataType int
-                               sequence int
-                               finished int
-                               data     string
-                       }{
-                               {1, request, 0, finished, ""},
-                               {3, response, 1, finished, ""},
-                               {4, response, 0, finished, ""},
-                       },
-                       http: []struct {
-                               start int
-                               end   int
-                       }{},
-                       residueID: []int{1, 3, 4},
-               },
-               {
-                       name: "unfinished response",
-                       events: []struct {
-                               dataID   int
-                               dataType int
-                               sequence int
-                               finished int
-                               data     string
-                       }{
-                               {42, response, 0, notFinished, ""},
-                               {42, response, 1, notFinished, ""},
-                               {42, response, 2, finished, ""},
-
-                               {48, request, 0, finished, ""},
-                               {50, response, 0, notFinished, ""},
-                               {50, response, 1, notFinished, ""},
-                               {50, response, 2, finished, ""},
-                       },
-                       http: []struct {
-                               start int
-                               end   int
-                       }{
-                               {48, 50},
-                       },
-                       residueID: []int{42, 42, 42},
-               },
-               {
-                       name: "unknown data",
-                       events: []struct {
-                               dataID   int
-                               dataType int
-                               sequence int
-                               finished int
-                               data     string
-                       }{
-                               {1, unknown, 0, notFinished, "GET / 
HTTP/1.1\r\n"},
-                               {1, unknown, 1, notFinished, "Host: 
test.com\n\r\n"},
-                               {2, response, 0, finished, ""},
-
-                               {3, unknown, 1, notFinished, "Host: 
test.com\n\r\n"},
-                               {4, response, 0, finished, ""},
-                               {3, unknown, 0, notFinished, "GET / 
HTTP/1.1\r\n"},
-
-                               {6, unknown, 1, notFinished, "Host: 
test.com\n\r\n"},
-                               {5, request, 0, finished, ""},
-                               {6, unknown, 0, notFinished, "HTTP/1.1 200 
OK\r\n"},
-
-                               // request not finished
-                               {7, unknown, 1, notFinished, "Host: 
test.com\n"},
-                               {8, response, 0, finished, ""},
-                               {7, unknown, 0, notFinished, "GET / 
HTTP/1.1\r\n"},
-                               {9, request, 0, finished, ""},
-                               {10, response, 0, finished, ""},
-                       },
-                       http: []struct {
-                               start int
-                               end   int
-                       }{
-                               {1, 2},
-                               {3, 4},
-                               {5, 6},
-                               {9, 10},
-                       },
-                       residueID: []int{7, 7, 8},
-               },
-       }
-
-       for _, testCase := range tests {
-               //t.Run(testCase.name, func(t *testing.T) {
-               analyzer := NewHTTP1Analyzer().(*Analyzer)
-               l := list.New()
-               var events = make([]struct {
-                       start, end int
-               }, 0)
-               for _, event := range testCase.events {
-                       req, resp := analyzer.buildHTTP1(l, 
&base2.SocketDataUploadEvent{
-                               DataID:   uint64(event.dataID),
-                               MsgType:  
base.SocketMessageType(event.dataType),
-                               Sequence: uint16(event.sequence),
-                               Finished: uint8(event.finished),
-                               Buffer:   bufferConvert(event.data),
-                               DataLen:  uint16(len(event.data)),
-                       })
-                       if req != nil && resp != nil {
-                               events = append(events, struct{ start, end int 
}{start: req.MinDataID(), end: resp.MaxDataID()})
-                       }
-               }
-
-               if !reflect.DeepEqual(testCase.http, events) {
-                       t.Fatalf("excepted http: %v, actual: %v", 
testCase.http, events)
-               }
-
-               exceptedList := testCase.residueID
-               if exceptedList == nil {
-                       exceptedList = make([]int, 0)
-               }
-               actualList := make([]int, 0)
-               for element := l.Front(); element != nil; element = 
element.Next() {
-                       actualList = append(actualList, 
int(element.Value.(*base2.SocketDataUploadEvent).DataID))
-               }
-               if !reflect.DeepEqual(exceptedList, actualList) {
-                       t.Fatalf("excepted residue data list: %v, actual: %v", 
exceptedList, actualList)
-               }
-               //})
-       }
-}
-
-var defaultBuffer [2048]byte
-
-func bufferConvert(data string) [2048]byte {
-       if data == "" {
-               return defaultBuffer
-       }
-       var buffer [2048]byte
-       for inx, d := range []byte(data) {
-               buffer[inx] = d
-       }
-       return buffer
-}
-
-func TestParseSimpleHTTP1Response(t *testing.T) {
-       s := `HTTP/1.0 200 OK\r\n`
-       h := &http.Request{}
-       analyzer := NewHTTP1Analyzer().(*Analyzer)
-       resp, err := 
analyzer.tryingToReadResponseWithoutHeaders(bufio.NewReader(strings.NewReader(s)),
 h)
-       if err != nil {
-               t.Fatalf("reading simple response error: %v", err)
-       }
-       if resp.Body != nil {
-               defer resp.Body.Close()
-       }
-}
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go
deleted file mode 100644
index a55ae5f..0000000
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go
+++ /dev/null
@@ -1,290 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package http1
-
-import (
-       "bufio"
-       "bytes"
-       "container/list"
-       "net/http"
-
-       base2 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
-
-       
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
-)
-
-type BufferAnalyzer struct {
-       http1Analyzer *Analyzer
-
-       unknownEventBuffer base2.SocketDataBuffer
-       unknownElement     *list.Element
-       unknownSize        int
-       request            *base2.SocketDataUploadEvent
-       requestElement     *list.Element
-       response           *base2.SocketDataUploadEvent
-       responseElement    *list.Element
-
-       unknownDataID      uint64
-       unknownMaxSequence uint16
-       reqDataID          uint64
-       reqMaxSequence     uint16
-       reqFinished        bool
-       respDataID         uint64
-       respMaxSequence    uint16
-       respFinished       bool
-}
-
-func NewHTTP1BufferAnalyzer(http1 *Analyzer) *BufferAnalyzer {
-       return &BufferAnalyzer{http1Analyzer: http1}
-}
-
-func (h *BufferAnalyzer) Analyze(events *list.List) (request, response 
base2.SocketDataBuffer) {
-       for element := events.Front(); element != nil; element = element.Next() 
{
-               curEvent := element.Value.(*base2.SocketDataUploadEvent)
-               // transform the unknown to the request or response
-               if continueReading, req, resp := h.handleUnknown(events, 
element, curEvent); req != nil && resp != nil {
-                       return req, resp
-               } else if continueReading {
-                       continue
-               }
-
-               if continueReading, req, resp := h.handleRequest(events, 
element, curEvent); req != nil && resp != nil {
-                       return req, resp
-               } else if continueReading {
-                       continue
-               }
-
-               if req, resp := h.handleResponse(events, element, curEvent); 
req != nil && resp != nil {
-                       return req, resp
-               }
-       }
-       return nil, nil
-}
-
-func (h *BufferAnalyzer) handleUnknown(event *list.List, element *list.Element,
-       curEvent *base2.SocketDataUploadEvent) (continueReading bool, req, resp 
base2.SocketDataBuffer) {
-       if curEvent.MsgType != base.SocketMessageTypeUnknown {
-               return false, nil, nil
-       }
-       if h.unknownEventBuffer == base2.SocketDataBuffer(nil) {
-               // maybe the unknown type is response, so clean the context
-               if !curEvent.IsStart() {
-                       h.cleanContext()
-                       return true, nil, nil
-               }
-               h.resetStartUnknown(element, curEvent)
-               req, resp = h.tryingToAnalyzeTheUnknown(event, curEvent)
-               if req != nil && resp != nil {
-                       return false, req, resp
-               }
-               return true, nil, nil
-       }
-       if curEvent.MsgType == base.SocketMessageTypeUnknown {
-               if h.unknownDataID == curEvent.DataID && h.unknownMaxSequence+1 
== curEvent.Sequence {
-                       h.unknownEventBuffer = 
h.unknownEventBuffer.Combine(curEvent)
-                       h.unknownMaxSequence++
-               } else if curEvent.IsStart() {
-                       h.resetStartUnknown(element, curEvent)
-               } else {
-                       h.cleanContext()
-               }
-
-               req, resp = h.tryingToAnalyzeTheUnknown(event, curEvent)
-               if req != nil && resp != nil {
-                       return false, req, resp
-               }
-               return true, nil, nil
-       }
-       return false, nil, nil
-}
-
-func (h *BufferAnalyzer) handleRequest(events *list.List, element 
*list.Element,
-       curEvent *base2.SocketDataUploadEvent) (continueReading bool, req, resp 
base2.SocketDataBuffer) {
-       if h.request == nil {
-               // find the first request package event
-               if curEvent.MsgType == base.SocketMessageTypeRequest && 
curEvent.IsStart() {
-                       h.resetStartRequest(element, curEvent)
-               }
-               return true, nil, nil
-       }
-       if curEvent.MsgType == base.SocketMessageTypeRequest {
-               // if the request not finished and latest request sequence 
match with current event
-               // then keep the request tracing
-               if !h.reqFinished && h.reqDataID == curEvent.DataID && 
h.reqMaxSequence+1 == curEvent.Sequence {
-                       h.reqMaxSequence++
-                       h.reqFinished = curEvent.IsFinished()
-               } else if curEvent.IsStart() {
-                       // if current event is new one, then update to current 
request
-                       h.resetStartRequest(element, curEvent)
-               } else {
-                       // Otherwise, clean the request and response context
-                       h.cleanContext()
-               }
-
-               // if request and response all finished, then return
-               if h.reqFinished && h.respFinished {
-                       req, resp = h.buildHTTP(events)
-                       return false, req, resp
-               }
-
-               return true, nil, nil
-       }
-       return false, nil, nil
-}
-
-func (h *BufferAnalyzer) handleResponse(events *list.List, element 
*list.Element,
-       curEvent *base2.SocketDataUploadEvent) (req, resp 
base2.SocketDataBuffer) {
-       if h.response == nil {
-               // if current response is not start, then clean to re-find new 
one
-               if !curEvent.IsStart() {
-                       h.cleanContext()
-                       return nil, nil
-               }
-               h.resetStartResponse(element, curEvent)
-               if h.reqFinished && h.respFinished {
-                       return h.buildHTTP(events)
-               }
-               return nil, nil
-       }
-
-       // if a new response, then clean the re-find new one, wait the previous 
data
-       if curEvent.IsStart() {
-               h.cleanContext()
-               return nil, nil
-       }
-
-       // if response sequence is broken, then clean the context
-       if h.respDataID != curEvent.DataID || h.respMaxSequence+1 != 
curEvent.Sequence {
-               h.cleanContext()
-               return nil, nil
-       }
-       h.respDataID = curEvent.DataID
-       h.respMaxSequence = curEvent.Sequence
-
-       if h.reqFinished && curEvent.IsFinished() {
-               return h.buildHTTP(events)
-       }
-       return nil, nil
-}
-
-func (h *BufferAnalyzer) resetStartUnknown(element *list.Element, curEvent 
*base2.SocketDataUploadEvent) {
-       h.unknownEventBuffer = curEvent
-       h.unknownElement = element
-       h.unknownDataID = curEvent.DataID
-       h.unknownMaxSequence = curEvent.Sequence
-}
-
-func (h *BufferAnalyzer) resetStartRequest(element *list.Element, curEvent 
*base2.SocketDataUploadEvent) {
-       h.request = curEvent
-       h.reqDataID = curEvent.DataID
-       h.reqMaxSequence = curEvent.Sequence
-       h.reqFinished = curEvent.IsFinished()
-       h.requestElement = element
-}
-
-func (h *BufferAnalyzer) resetStartResponse(element *list.Element, curEvent 
*base2.SocketDataUploadEvent) {
-       h.response = curEvent
-       h.respDataID = curEvent.DataID
-       h.respMaxSequence = curEvent.Sequence
-       h.responseElement = element
-       h.respFinished = curEvent.IsFinished()
-}
-
-func (h *BufferAnalyzer) tryingToAnalyzeTheUnknown(events *list.List, curEvent 
*base2.SocketDataUploadEvent) (req, resp base2.SocketDataBuffer) {
-       if h.unknownEventBuffer == base2.SocketDataBuffer(nil) {
-               return nil, nil
-       }
-       // length not enough
-       if len(h.unknownEventBuffer.BufferData()) < 16 {
-               return nil, nil
-       }
-       _, err := 
http.ReadRequest(bufio.NewReader(bytes.NewBuffer(h.unknownEventBuffer.BufferData())))
-       if err == nil {
-               // update the event as request
-               curEvent.Finished = 1
-               h.transformUnknown(h.unknownElement, 
base.SocketMessageTypeRequest)
-               // update the current data is request
-               h.resetStartRequest(h.unknownElement, 
h.unknownEventBuffer.FirstEvent())
-               h.reqFinished = true
-               h.cleanResponseContext()
-               h.cleanUnknownContext()
-               return nil, nil
-       }
-       tmpResponse, err := 
http.ReadResponse(bufio.NewReader(bytes.NewBuffer(h.unknownEventBuffer.BufferData())),
 &http.Request{})
-       if err == nil {
-               defer tmpResponse.Body.Close()
-               curEvent.Finished = 1
-               h.transformUnknown(h.unknownElement, 
base.SocketMessageTypeResponse)
-               // if request already finished, then remove the request
-               if h.reqFinished {
-                       h.resetStartResponse(h.unknownElement, 
h.unknownEventBuffer.FirstEvent())
-                       return h.buildHTTP(events)
-               }
-               // otherwise, clean the context and wait request
-               h.cleanContext()
-       }
-       return nil, nil
-}
-
-func (h *BufferAnalyzer) transformUnknown(element *list.Element, msgType 
base.SocketMessageType) {
-       // update message type and total size
-       firstEvent := element.Value.(*base2.SocketDataUploadEvent)
-       firstEvent.MsgType = msgType
-       dataLen := int(firstEvent.DataLen)
-       for e := element.Next(); e != nil; e = e.Next() {
-               curEvent := e.Value.(*base2.SocketDataUploadEvent)
-               if curEvent.Finished == 1 {
-                       curEvent.MsgType = msgType
-                       dataLen += int(curEvent.DataLen)
-                       firstEvent.TotalSize0 = uint64(dataLen)
-                       return
-               }
-               curEvent.MsgType = msgType
-               dataLen += int(curEvent.DataLen)
-       }
-}
-
-func (h *BufferAnalyzer) cleanContext() {
-       h.cleanUnknownContext()
-       h.cleanRequestContext()
-       h.cleanResponseContext()
-}
-
-func (h *BufferAnalyzer) cleanResponseContext() {
-       h.response = nil
-       h.respDataID = 0
-       h.respMaxSequence = 0
-       h.respFinished = false
-}
-
-func (h *BufferAnalyzer) cleanRequestContext() {
-       h.request = nil
-       h.reqDataID = 0
-       h.reqMaxSequence = 0
-       h.reqFinished = false
-}
-
-func (h *BufferAnalyzer) cleanUnknownContext() {
-       h.unknownEventBuffer, h.unknownElement = nil, nil
-       h.unknownSize, h.unknownDataID, h.unknownMaxSequence = 0, 0, 0
-}
-
-func (h *BufferAnalyzer) buildHTTP(events *list.List) (req, resp 
base2.SocketDataBuffer) {
-       return h.http1Analyzer.combineAndRemoveEvent(events, h.requestElement, 
nil),
-               h.http1Analyzer.combineAndRemoveEvent(events, 
h.responseElement, nil)
-}
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
index 5756206..9290d34 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
@@ -18,23 +18,16 @@
 package http1
 
 import (
-       "bufio"
-       "bytes"
-       "compress/gzip"
        "encoding/json"
        "fmt"
-       "io"
-       "mime"
-       "net/http"
        "strings"
        "time"
 
-       "golang.org/x/net/html/charset"
-
        "github.com/apache/skywalking-rover/pkg/process/api"
        profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
        
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
        protocol 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+       
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader"
        
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
        "github.com/apache/skywalking-rover/pkg/tools"
        "github.com/apache/skywalking-rover/pkg/tools/host"
@@ -80,27 +73,28 @@ func NewHTTP1URIMetrics() *URIMetrics {
        }
 }
 
-func (u *URIMetrics) Append(sampleConfig *SamplingConfig,
-       req *http.Request, reqBuffer protocol.SocketDataBuffer, resp 
*http.Response, respBuffer protocol.SocketDataBuffer) {
+func (u *URIMetrics) Append(sampleConfig *SamplingConfig, req *reader.Request, 
resp *reader.Response) {
        u.RequestCounter.Increase()
-       statusCounter := u.StatusCounter[resp.StatusCode]
+       statusCounter := u.StatusCounter[resp.StatusCode()]
        if statusCounter == nil {
                statusCounter = metrics.NewCounter()
-               u.StatusCounter[resp.StatusCode] = statusCounter
+               u.StatusCounter[resp.StatusCode()] = statusCounter
        }
        statusCounter.Increase()
 
-       u.AvgRequestPackageSize.Increase(float64(reqBuffer.TotalSize()))
-       u.AvgResponsePackageSize.Increase(float64(respBuffer.TotalSize()))
-       u.ReqPackageSizeHistogram.Increase(float64(reqBuffer.TotalSize()))
-       u.RespPackageSizeHistogram.Increase(float64(respBuffer.TotalSize()))
+       requestTotalSize := req.ContentTotalSize()
+       responseTotalSize := resp.ContentTotalSize()
+       u.AvgRequestPackageSize.Increase(float64(requestTotalSize))
+       u.AvgResponsePackageSize.Increase(float64(responseTotalSize))
+       u.ReqPackageSizeHistogram.Increase(float64(requestTotalSize))
+       u.RespPackageSizeHistogram.Increase(float64(responseTotalSize))
 
-       duration := time.Duration(respBuffer.EndTime() - reqBuffer.StartTime())
+       duration := time.Duration(resp.EndTime() - req.StartTime())
        durationInMS := float64(duration.Milliseconds())
        u.avgDuration.Increase(durationInMS)
        u.durationHistogram.Increase(durationInMS)
 
-       u.sampler.AppendMetrics(sampleConfig, duration, req, resp, reqBuffer, 
respBuffer)
+       u.sampler.AppendMetrics(sampleConfig, duration, req, resp)
 }
 
 func (u *URIMetrics) appendMetrics(traffic *base.ProcessTraffic,
@@ -177,15 +171,13 @@ func (u *URIMetrics) String() string {
 }
 
 type Trace struct {
-       Trace          protocol.TracingContext
-       RequestURI     string
-       RequestBuffer  protocol.SocketDataBuffer
-       Request        *http.Request
-       ResponseBuffer protocol.SocketDataBuffer
-       Response       *http.Response
-       Type           string
-       Settings       *profiling.NetworkDataCollectingSettings
-       TaskConfig     *profiling.HTTPSamplingConfig
+       Trace      protocol.TracingContext
+       RequestURI string
+       Request    *reader.Request
+       Response   *reader.Response
+       Type       string
+       Settings   *profiling.NetworkDataCollectingSettings
+       TaskConfig *profiling.HTTPSamplingConfig
 }
 
 func (h *Trace) Flush(duration int64, process api.ProcessInterface, traffic 
*base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
@@ -212,7 +204,7 @@ func (h *Trace) Flush(duration int64, process 
api.ProcessInterface, traffic *bas
                SSL:           traffic.IsSSL,
                URI:           h.RequestURI,
                Reason:        h.Type,
-               Status:        h.Response.StatusCode,
+               Status:        h.Response.StatusCode(),
        }
        if traffic.Role == base.ConnectionRoleClient {
                body.ClientProcess = &SamplingTraceLogProcess{ProcessID: 
process.ID()}
@@ -238,35 +230,35 @@ func (h *Trace) Flush(duration int64, process 
api.ProcessInterface, traffic *bas
 func (h *Trace) AppendHTTPEvents(process api.ProcessInterface, traffic 
*base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
        events := make([]*v3.SpanAttachedEvent, 0)
        if h.Settings != nil && h.Settings.RequireCompleteRequest {
-               events = h.appendHTTPEvent(events, process, traffic, 
transportRequest, h.Request.Header,
-                       h.Request.Body, h.RequestBuffer, 
h.Settings.MaxRequestSize)
+               events = h.appendHTTPEvent(events, process, traffic, 
transportRequest, h.Request.MessageOpt, h.TaskConfig.DefaultRequestEncoding,
+                       h.Settings.MaxRequestSize)
        }
        if h.Settings != nil && h.Settings.RequireCompleteResponse {
-               events = h.appendHTTPEvent(events, process, traffic, 
transportResponse, h.Response.Header,
-                       h.Response.Body, h.ResponseBuffer, 
h.Settings.MaxResponseSize)
+               events = h.appendHTTPEvent(events, process, traffic, 
transportResponse, h.Response.MessageOpt, h.TaskConfig.DefaultResponseEncoding,
+                       h.Settings.MaxResponseSize)
        }
 
        metricsBuilder.AppendSpanAttachedEvents(events)
 }
 
 func (h *Trace) appendHTTPEvent(events []*v3.SpanAttachedEvent, process 
api.ProcessInterface, traffic *base.ProcessTraffic,
-       tp string, header http.Header, body io.Reader, buffer 
protocol.SocketDataBuffer, maxSize int32) []*v3.SpanAttachedEvent {
-       content, err := h.transformHTTPBody(tp, header, body, buffer, maxSize)
+       tp string, message *reader.MessageOpt, defaultBodyEncoding string, 
maxSize int32) []*v3.SpanAttachedEvent {
+       content, err := message.TransformReadableContent(defaultBodyEncoding, 
int(maxSize))
        if err != nil {
                log.Warnf("transform http %s erorr: %v", tp, err)
                return events
        }
 
        event := &v3.SpanAttachedEvent{}
-       event.StartTime = host.TimeToInstant(buffer.StartTime())
-       event.EndTime = host.TimeToInstant(buffer.EndTime())
+       event.StartTime = host.TimeToInstant(message.StartTime())
+       event.EndTime = host.TimeToInstant(message.EndTime())
        event.Event = fmt.Sprintf("HTTP %s Sampling", tp)
        event.Tags = make([]*commonv3.KeyStringValuePair, 0)
        event.Tags = append(event.Tags,
                // content data
-               &commonv3.KeyStringValuePair{Key: "data_size", Value: 
units.BytesSize(float64(buffer.TotalSize()))},
+               &commonv3.KeyStringValuePair{Key: "data_size", Value: 
units.BytesSize(float64(message.ContentTotalSize()))},
                &commonv3.KeyStringValuePair{Key: "data_content", Value: 
content},
-               &commonv3.KeyStringValuePair{Key: "data_direction", Value: 
buffer.Direction().String()},
+               &commonv3.KeyStringValuePair{Key: "data_direction", Value: 
message.Direction().String()},
                &commonv3.KeyStringValuePair{Key: "data_type", Value: 
strings.ToLower(tp)},
                // connection
                &commonv3.KeyStringValuePair{Key: "connection_role", Value: 
traffic.Role.String()},
@@ -286,112 +278,6 @@ func (h *Trace) appendHTTPEvent(events 
[]*v3.SpanAttachedEvent, process api.Proc
        return append(events, event)
 }
 
-// nolint
-func (h *Trace) transformHTTPBody(tp string, header http.Header, _ io.Reader, 
buffer protocol.SocketDataBuffer, maxSize int32) (string, error) {
-       var needGzip, isPlain, isUtf8 = header.Get("Content-Encoding") == 
"gzip", true, true
-       contentType := header.Get("Content-Type")
-       if contentType == "" {
-               if tp == transportRequest {
-                       contentType = h.TaskConfig.DefaultRequestEncoding
-               } else {
-                       contentType = h.TaskConfig.DefaultResponseEncoding
-               }
-               contentType = fmt.Sprintf("text/html; charset=%s", contentType)
-       }
-
-       isPlain = strings.HasPrefix(contentType, "text/") || contentType == 
"application/json"
-       if _, params, err := mime.ParseMediaType(contentType); err == nil {
-               if cs, ok := params["charset"]; ok {
-                       isUtf8 = strings.ToLower(cs) == "utf-8"
-               }
-       }
-
-       if !needGzip && isPlain && isUtf8 {
-               resultSize := len(buffer.BufferData())
-               if maxSize > 0 && resultSize > int(maxSize) {
-                       resultSize = int(maxSize)
-               }
-               return string(buffer.BufferData()[0:resultSize]), nil
-       }
-
-       // re-read the buffer and skip to the body position
-       buf := bufio.NewReaderSize(bytes.NewBuffer(buffer.BufferData()), 
len(buffer.BufferData()))
-       var httpBody io.ReadCloser
-       if tp == transportRequest {
-               req, err := http.ReadRequest(buf)
-               if err != nil {
-                       return "", err
-               }
-               httpBody = req.Body
-       } else {
-               response, err := http.ReadResponse(buf, nil)
-               if err != nil {
-                       return "", err
-               }
-               httpBody = response.Body
-       }
-       defer httpBody.Close()
-
-       // no text plain, no need to print the data
-       headerLen := len(buffer.BufferData()) - buf.Buffered()
-       if maxSize > 0 && int(maxSize) < headerLen {
-               return string(buffer.BufferData()[:maxSize]), nil
-       }
-       headerString := string(buffer.BufferData()[:headerLen])
-       if !isPlain {
-               return fmt.Sprintf("%s[not plain, current content type: %s]", 
headerString, contentType), nil
-       }
-       // nobody
-       if buf.Buffered() == 0 {
-               return headerString, nil
-       }
-
-       data := httpBody
-       var err error
-       if needGzip {
-               data, err = gzip.NewReader(httpBody)
-               if err != nil {
-                       return "", err
-               }
-       }
-       if !isUtf8 {
-               data, err = newCharsetReader(data, contentType)
-               if err != nil {
-                       return "", err
-               }
-       }
-
-       realData, err := io.ReadAll(data)
-       if err != nil && err != io.ErrUnexpectedEOF {
-               return "", err
-       }
-       resultSize := len(realData)
-       if maxSize > 0 && (resultSize+headerLen) > int(maxSize) {
-               resultSize = int(maxSize) - headerLen
-       }
-       return fmt.Sprintf("%s%s", headerString, 
string(realData[0:resultSize])), nil
-}
-
-type charsetReadWrapper struct {
-       reader io.Reader
-}
-
-func newCharsetReader(r io.Reader, contentType string) (*charsetReadWrapper, 
error) {
-       reader, err := charset.NewReader(r, contentType)
-       if err != nil {
-               return nil, err
-       }
-       return &charsetReadWrapper{reader: reader}, nil
-}
-
-func (c *charsetReadWrapper) Read(p []byte) (n int, err error) {
-       return c.reader.Read(p)
-}
-
-func (c *charsetReadWrapper) Close() error {
-       return nil
-}
-
 type SamplingTraceLogBody struct {
        URI           string                   `json:"uri"`
        Reason        string                   `json:"reason"`
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
new file mode 100644
index 0000000..90863e8
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -0,0 +1,298 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package reader
+
+import (
+       "bufio"
+       "compress/gzip"
+       "fmt"
+       "io"
+       "mime"
+       "net/http"
+       "strconv"
+       "strings"
+
+       "github.com/apache/skywalking-rover/pkg/logger"
+
+       "golang.org/x/net/html/charset"
+
+       
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+       protocol 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+)
+
+var (
+       headBuffer = make([]byte, 16)
+       bodyBuffer = make([]byte, 4096)
+
+       requestMethods = []string{
+               "GET", "POST", "OPTION", "HEAD", "PUT", "DELETE", "CONNECT", 
"TRACE", "PATCH",
+       }
+)
+
+var log = logger.GetLogger("profiling", "task", "network", "layer7", 
"protocols", "http1", "reader")
+
+type MessageType int
+
+const (
+       MessageTypeRequest MessageType = iota
+       MessageTypeResponse
+       MessageTypeUnknown
+)
+
+func IdentityMessageType(reader *protocol.Buffer) (MessageType, error) {
+       n, err := reader.Peek(headBuffer)
+       if err != nil {
+               return MessageTypeUnknown, err
+       } else if n != len(headBuffer) {
+               return MessageTypeUnknown, fmt.Errorf("need more content for 
header")
+       }
+
+       headerString := string(headBuffer)
+       isRequest := false
+       for _, method := range requestMethods {
+               if strings.HasPrefix(headerString, method) {
+                       isRequest = true
+                       break
+               }
+       }
+       if isRequest {
+               return MessageTypeRequest, nil
+       }
+
+       if strings.HasPrefix(headerString, "HTTP") {
+               return MessageTypeResponse, nil
+       }
+       return MessageTypeUnknown, nil
+}
+
+type Message interface {
+       Headers() http.Header
+       HeaderBuffer() *protocol.Buffer
+       BodyBuffer() *protocol.Buffer
+}
+
+type MessageOpt struct {
+       Message
+}
+
+func (m *MessageOpt) ContentTotalSize() int {
+       return m.HeaderBuffer().Len() + m.BodyBuffer().Len()
+}
+
+func (m *MessageOpt) StartTime() uint64 {
+       return m.HeaderBuffer().FirstSocketBuffer().StartTime()
+}
+
+func (m *MessageOpt) EndTime() uint64 {
+       return m.HeaderBuffer().LastSocketBuffer().EndTime()
+}
+
+func (m *MessageOpt) Direction() base.SocketDataDirection {
+       return m.HeaderBuffer().FirstSocketBuffer().Direction()
+}
+
+// nolint
+func (m *MessageOpt) TransformReadableContent(defaultEncoding string, maxSize 
int) (string, error) {
+       contentType := m.Headers().Get("Content-Type")
+       if contentType == "" {
+               contentType = fmt.Sprintf("text/html; charset=%s", 
defaultEncoding)
+       }
+       isPlain := strings.HasPrefix(contentType, "text/") || contentType == 
"application/json"
+
+       // header to string
+       headerBuf, err := io.ReadAll(m.HeaderBuffer())
+       if err != nil {
+               return "", err
+       }
+       if maxSize > 0 && len(headerBuf) >= maxSize {
+               return string(headerBuf[:maxSize]), nil
+       }
+       headerString := string(headerBuf)
+       if !isPlain {
+               return fmt.Sprintf("%s[not plain, current content type: %s]", 
headerString, contentType), nil
+       }
+
+       // body to string
+       bodyLength := m.BodyBuffer().Len()
+       if bodyLength == 0 {
+               return headerString, nil
+       }
+       bodyReader, err := m.buildBodyReader(defaultEncoding)
+       if err != nil {
+               return "", err
+       }
+
+       bodyData, err := io.ReadAll(bodyReader)
+       if err != nil && err != io.ErrUnexpectedEOF {
+               return "", err
+       }
+       resultSize := len(bodyData)
+       if maxSize > 0 && (resultSize+len(headerString)) > maxSize {
+               resultSize = maxSize - len(headerString)
+       }
+       return fmt.Sprintf("%s%s", headerString, 
string(bodyData[0:resultSize])), nil
+}
+
+func (m *MessageOpt) buildBodyReader(contentType string) (io.Reader, error) {
+       var needGzip = m.Headers().Get("Content-Encoding") == "gzip"
+       var isUtf8 = true
+       if _, params, err := mime.ParseMediaType(contentType); err == nil {
+               if cs, ok := params["charset"]; ok {
+                       isUtf8 = strings.EqualFold(cs, "utf-8")
+               }
+       }
+
+       var data io.Reader = m.BodyBuffer()
+       var err error
+       if needGzip {
+               data, err = gzip.NewReader(m.BodyBuffer())
+               if err != nil {
+                       return nil, err
+               }
+       }
+       if !isUtf8 {
+               data, err = newCharsetReader(data, contentType)
+               if err != nil {
+                       return nil, err
+               }
+       }
+       return data, nil
+}
+
+func (m *MessageOpt) appointedLength() (int, error) {
+       contentLengthStr := m.Headers().Get("Content-Length")
+       if contentLengthStr == "" {
+               return -1, nil
+       }
+       contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
+       if err != nil {
+               return 0, fmt.Errorf("the request has not correct content 
length header value: %s", contentLengthStr)
+       }
+       return int(contentLength), nil
+}
+
+func (m *MessageOpt) isChunked() bool {
+       return m.Headers().Get("Transfer-Encoding") == "chunked"
+}
+
+func (m *MessageOpt) readBodyUntilCurrentPackageFinished(buf *protocol.Buffer, 
reader *bufio.Reader) (*protocol.Buffer, protocol.ParseResult, error) {
+       startPosition := buf.OffsetPosition(-reader.Buffered())
+       for !buf.IsCurrentPacketReadFinished() {
+               _, err := buf.Read(bodyBuffer)
+               if err != nil {
+                       return nil, protocol.ParseResultSkipPackage, err
+               }
+       }
+       endPosition := buf.Position()
+       return buf.Slice(true, startPosition, endPosition), 
protocol.ParseResultSuccess, nil
+}
+
+func (m *MessageOpt) checkChunkedBody(buf *protocol.Buffer, bodyReader 
*bufio.Reader) (*protocol.Buffer, protocol.ParseResult, error) {
+       buffers := make([]*protocol.Buffer, 0)
+       for {
+               line, _, err := bodyReader.ReadLine()
+               if err != nil {
+                       return nil, protocol.ParseResultSkipPackage, err
+               }
+               needBytesStr := string(line)
+               needBytes, err := strconv.ParseInt(needBytesStr, 16, 64)
+               if err != nil {
+                       return nil, protocol.ParseResultSkipPackage, 
fmt.Errorf("read chunked size error: %s", needBytesStr)
+               }
+               if needBytes == 0 {
+                       break
+               }
+               if b, r, err1 := m.checkBodyWithSize(buf, bodyReader, 
int(needBytes), false); err1 != nil {
+                       return nil, protocol.ParseResultSkipPackage, err1
+               } else if r != protocol.ParseResultSuccess {
+                       return nil, r, nil
+               } else {
+                       if pos := b.DetectNotSendingLastPosition(); pos != nil {
+                               log.Debugf("found the socket data not sending 
finished in BPF, so update the body to the latest data, %v", pos)
+                               successSlice := b.Slice(true, b.Position(), pos)
+                               buffers = append(buffers, successSlice)
+                               break
+                       }
+                       buffers = append(buffers, b)
+               }
+               d, _, err := bodyReader.ReadLine()
+               if err != nil {
+                       return nil, protocol.ParseResultSkipPackage, err
+               }
+               if len(d) != 0 {
+                       return nil, protocol.ParseResultSkipPackage, 
fmt.Errorf("the chunk data parding error, should be empty: %s", d)
+               }
+       }
+       return protocol.CombineSlices(true, buffers...), 
protocol.ParseResultSuccess, nil
+}
+
+func (m *MessageOpt) checkBodyWithSize(buf *protocol.Buffer, reader 
*bufio.Reader, size int,
+       detectedNotSending bool) (*protocol.Buffer, protocol.ParseResult, 
error) {
+       reduceSize := size
+       var readSize, lastReadSize int
+       var err error
+       startPosition := buf.OffsetPosition(-reader.Buffered())
+       for reduceSize > 0 {
+               readSize = reduceSize
+               if readSize > len(bodyBuffer) {
+                       readSize = len(bodyBuffer)
+               }
+               lastReadSize, err = reader.Read(bodyBuffer[0:readSize])
+               if err != nil {
+                       if err == protocol.ErrNotComplete {
+                               return nil, protocol.ParseResultSkipPackage, nil
+                       }
+                       if err == io.EOF && reduceSize-lastReadSize <= 0 {
+                               return nil, protocol.ParseResultSuccess, nil
+                       }
+                       return nil, protocol.ParseResultSkipPackage, err
+               }
+               reduceSize -= lastReadSize
+       }
+       endPosition := buf.OffsetPosition(-reader.Buffered())
+       slice := buf.Slice(true, startPosition, endPosition)
+       if detectedNotSending {
+               if pos := slice.DetectNotSendingLastPosition(); pos != nil {
+                       log.Debugf("found the socket data not sending finished 
in BPF, so update the body to the latest data, %v", pos)
+                       endPosition = pos
+                       slice = buf.Slice(true, startPosition, endPosition)
+               }
+       }
+
+       return slice, protocol.ParseResultSuccess, nil
+}
+
+type charsetReadWrapper struct {
+       reader io.Reader
+}
+
+func newCharsetReader(r io.Reader, contentType string) (*charsetReadWrapper, 
error) {
+       reader, err := charset.NewReader(r, contentType)
+       if err != nil {
+               return nil, err
+       }
+       return &charsetReadWrapper{reader: reader}, nil
+}
+
+func (c *charsetReadWrapper) Read(p []byte) (n int, err error) {
+       return c.reader.Read(p)
+}
+
+func (c *charsetReadWrapper) Close() error {
+       return nil
+}
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
new file mode 100644
index 0000000..1e2445e
--- /dev/null
+++ 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package reader
+
+import (
+       "bufio"
+       "fmt"
+       "net/http"
+       "net/textproto"
+       "net/url"
+       "strings"
+
+       protocol 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+)
+
+type Request struct {
+       *MessageOpt
+       original     *http.Request
+       headerBuffer *protocol.Buffer
+       bodyBuffer   *protocol.Buffer
+}
+
+func (r *Request) Headers() http.Header {
+       return r.original.Header
+}
+
+func (r *Request) HeaderBuffer() *protocol.Buffer {
+       return r.headerBuffer
+}
+
+func (r *Request) BodyBuffer() *protocol.Buffer {
+       return r.bodyBuffer
+}
+
+func (r *Request) MinDataID() int {
+       return int(r.headerBuffer.FirstSocketBuffer().DataID())
+}
+
+func (r *Request) RequestURI() string {
+       return r.original.RequestURI
+}
+
+func ReadRequest(buf *protocol.Buffer) (*Request, protocol.ParseResult, error) 
{
+       bufReader := bufio.NewReader(buf)
+       tp := textproto.NewReader(bufReader)
+       req := &http.Request{}
+       result := &Request{original: req}
+       result.MessageOpt = &MessageOpt{result}
+
+       headerStartPosition := buf.Position()
+       line, err := tp.ReadLine()
+       if err != nil {
+               return nil, protocol.ParseResultSkipPackage, fmt.Errorf("read 
request first lint failure: %v", err)
+       }
+       method, rest, ok1 := strings.Cut(line, " ")
+       requestURI, proto, ok2 := strings.Cut(rest, " ")
+       if !ok1 || !ok2 {
+               return nil, protocol.ParseResultSkipPackage, fmt.Errorf("the 
first line is not request: %s", line)
+       }
+
+       isRequest := false
+       for _, m := range requestMethods {
+               if method == m {
+                       isRequest = true
+                       break
+               }
+       }
+       if !isRequest {
+               return nil, protocol.ParseResultSkipPackage, fmt.Errorf("is not 
request: %s", method)
+       }
+       major, minor, ok := http.ParseHTTPVersion(proto)
+       if !ok {
+               return nil, protocol.ParseResultSkipPackage, fmt.Errorf("the 
protocol version cannot be identity: %s", proto)
+       }
+       justAuthority := req.Method == "CONNECT" && 
!strings.HasPrefix(requestURI, "/")
+       if justAuthority {
+               requestURI = "http://"; + requestURI
+       }
+       uri, err := url.ParseRequestURI(requestURI)
+       if err != nil {
+               return nil, protocol.ParseResultSkipPackage, err
+       }
+       req.Method, req.URL, req.RequestURI = method, uri, requestURI
+       req.Proto, req.ProtoMajor, req.ProtoMinor = proto, major, minor
+
+       // header reader
+       mimeHeader, err := tp.ReadMIMEHeader()
+       if err != nil {
+               return nil, protocol.ParseResultSkipPackage, err
+       }
+       req.Header = http.Header(mimeHeader)
+
+       req.Host = req.URL.Host
+       if req.Host == "" {
+               req.Host = req.Header.Get("Host")
+       }
+
+       result.buildHeaderBuffer(headerStartPosition, buf, bufReader)
+       if b, r, err := result.readFullBody(bufReader, buf); err != nil {
+               return nil, protocol.ParseResultSkipPackage, err
+       } else if r != protocol.ParseResultSuccess {
+               return nil, r, nil
+       } else {
+               result.bodyBuffer = b
+       }
+
+       return result, protocol.ParseResultSuccess, nil
+}
+
+func (r *Request) buildHeaderBuffer(start *protocol.BufferPosition, buf 
*protocol.Buffer, bufReader *bufio.Reader) {
+       endPosition := buf.OffsetPosition(-bufReader.Buffered())
+       r.headerBuffer = buf.Slice(true, start, endPosition)
+}
+
+func (r *Request) readFullBody(bodyReader *bufio.Reader, original 
*protocol.Buffer) (*protocol.Buffer, protocol.ParseResult, error) {
+       length, err := r.appointedLength()
+       if err != nil {
+               return nil, protocol.ParseResultSkipPackage, err
+       } else if length > 0 {
+               return r.checkBodyWithSize(original, bodyReader, length, true)
+       }
+
+       if r.isChunked() {
+               return r.checkChunkedBody(original, bodyReader)
+       }
+
+       return r.readBodyUntilCurrentPackageFinished(original, bodyReader)
+}
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
new file mode 100644
index 0000000..e1b04ad
--- /dev/null
+++ 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
@@ -0,0 +1,125 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package reader
+
+import (
+       "bufio"
+       "fmt"
+       "net/http"
+       "net/textproto"
+       "strconv"
+       "strings"
+
+       protocol 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+)
+
+type Response struct {
+       *MessageOpt
+       req          *Request
+       original     *http.Response
+       headerBuffer *protocol.Buffer
+       bodyBuffer   *protocol.Buffer
+}
+
+func (r *Response) Headers() http.Header {
+       return r.original.Header
+}
+
+func (r *Response) HeaderBuffer() *protocol.Buffer {
+       return r.headerBuffer
+}
+
+func (r *Response) BodyBuffer() *protocol.Buffer {
+       return r.bodyBuffer
+}
+
+func (r *Response) StatusCode() int {
+       return r.original.StatusCode
+}
+
+func ReadResponse(req *Request, buf *protocol.Buffer) (*Response, 
protocol.ParseResult, error) {
+       bufReader := bufio.NewReader(buf)
+       tp := textproto.NewReader(bufReader)
+       resp := &http.Response{}
+       result := &Response{original: resp, req: req}
+       result.MessageOpt = &MessageOpt{result}
+
+       headerStartPosition := buf.Position()
+       line, err := tp.ReadLine()
+       if err != nil {
+               return nil, protocol.ParseResultSkipPackage, fmt.Errorf("read 
response first line failure: %v", err)
+       }
+       indexByte := strings.IndexByte(line, ' ')
+       if indexByte == -1 {
+               return nil, protocol.ParseResultSkipPackage, 
fmt.Errorf("parsing response error: %s", line)
+       }
+       resp.Proto = line[:indexByte]
+       resp.Status = strings.TrimLeft(line[indexByte+1:], " ")
+       statusCode := resp.Status
+       if i := strings.IndexByte(resp.Status, ' '); i != -1 {
+               statusCode = resp.Status[:i]
+       }
+       if len(statusCode) != 3 {
+               return nil, protocol.ParseResultSkipPackage, 
fmt.Errorf("parsing response status code failure: %v", statusCode)
+       }
+       resp.StatusCode, err = strconv.Atoi(statusCode)
+       if err != nil || resp.StatusCode < 0 {
+               return nil, protocol.ParseResultSkipPackage, fmt.Errorf("status 
code not correct: %s", statusCode)
+       }
+       var ok bool
+       if resp.ProtoMajor, resp.ProtoMinor, ok = 
http.ParseHTTPVersion(resp.Proto); !ok {
+               return nil, protocol.ParseResultSkipPackage, 
fmt.Errorf("parsing http version failure: %s", resp.Proto)
+       }
+
+       mimeHeader, err := tp.ReadMIMEHeader()
+       if err != nil {
+               result.buildHeaderBuffer(headerStartPosition, buf, bufReader)
+               return result, protocol.ParseResultSuccess, nil
+       }
+       resp.Header = http.Header(mimeHeader)
+
+       result.buildHeaderBuffer(headerStartPosition, buf, bufReader)
+       if b, r, err := result.readFullResponseBody(bufReader, buf); err != nil 
{
+               return nil, protocol.ParseResultSkipPackage, err
+       } else if r != protocol.ParseResultSuccess {
+               return nil, r, nil
+       } else {
+               result.bodyBuffer = b
+       }
+       return result, protocol.ParseResultSuccess, nil
+}
+
+func (r *Response) buildHeaderBuffer(start *protocol.BufferPosition, buf 
*protocol.Buffer, bufReader *bufio.Reader) {
+       endPosition := buf.OffsetPosition(-bufReader.Buffered())
+       r.headerBuffer = buf.Slice(true, start, endPosition)
+}
+
+func (r *Response) readFullResponseBody(bodyReader *bufio.Reader, original 
*protocol.Buffer) (*protocol.Buffer, protocol.ParseResult, error) {
+       length, err := r.appointedLength()
+       if err != nil {
+               return nil, protocol.ParseResultSkipPackage, err
+       } else if length > 0 {
+               return r.checkBodyWithSize(original, bodyReader, length, true)
+       }
+
+       if r.isChunked() {
+               return r.checkChunkedBody(original, bodyReader)
+       }
+
+       return r.readBodyUntilCurrentPackageFinished(original, bodyReader)
+}
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go
index 75f602a..6a98c12 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go
@@ -19,7 +19,6 @@ package http1
 
 import (
        "fmt"
-       "net/http"
        "regexp"
        "strings"
        "time"
@@ -30,6 +29,7 @@ import (
        profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
        
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
        protocol 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+       
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader"
        
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
 )
 
@@ -52,13 +52,12 @@ func NewSampler() *Sampler {
        }
 }
 
-func (s *Sampler) AppendMetrics(config *SamplingConfig, duration time.Duration,
-       request *http.Request, response *http.Response, reqBuffer, respBuffer 
protocol.SocketDataBuffer) {
+func (s *Sampler) AppendMetrics(config *SamplingConfig, duration 
time.Duration, request *reader.Request, response *reader.Response) {
        if config == nil {
                return
        }
        tracingContext, err := protocol.AnalyzeTracingContext(func(key string) 
string {
-               return request.Header.Get(key)
+               return request.Headers().Get(key)
        })
        if err != nil {
                log.Warnf("analyze tracing context error: %v", err)
@@ -68,7 +67,7 @@ func (s *Sampler) AppendMetrics(config *SamplingConfig, 
duration time.Duration,
                return
        }
 
-       uri := request.RequestURI
+       uri := request.RequestURI()
        // remove the query parameters
        if i := strings.Index(uri, "?"); i > 0 {
                uri = uri[0:i]
@@ -82,10 +81,10 @@ func (s *Sampler) AppendMetrics(config *SamplingConfig, 
duration time.Duration,
 
        var traceType string
        var topN *metrics.TopN
-       if rule.When5XX && response.StatusCode >= 500 && response.StatusCode < 
600 {
+       if rule.When5XX && response.StatusCode() >= 500 && 
response.StatusCode() < 600 {
                traceType = "status_5xx"
                topN = s.Error5xxTraces
-       } else if rule.When4XX && response.StatusCode >= 400 && 
response.StatusCode < 500 {
+       } else if rule.When4XX && response.StatusCode() >= 400 && 
response.StatusCode() < 500 {
                traceType = "status_4xx"
                topN = s.Error4xxTraces
        } else if rule.MinDuration != nil && int64(*rule.MinDuration) <= 
duration.Milliseconds() {
@@ -96,15 +95,13 @@ func (s *Sampler) AppendMetrics(config *SamplingConfig, 
duration time.Duration,
        }
 
        trace := &Trace{
-               Trace:          tracingContext,
-               RequestURI:     uri,
-               RequestBuffer:  reqBuffer,
-               ResponseBuffer: respBuffer,
-               Request:        request,
-               Response:       response,
-               Type:           traceType,
-               Settings:       rule.Settings,
-               TaskConfig:     config.ProfilingSampling,
+               Trace:      tracingContext,
+               RequestURI: uri,
+               Request:    request,
+               Response:   response,
+               Type:       traceType,
+               Settings:   rule.Settings,
+               TaskConfig: config.ProfilingSampling,
        }
        topN.AddRecord(trace, duration.Milliseconds())
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index 4028039..d3db212 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -23,6 +23,8 @@ import (
        
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
        protocol 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
        
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1"
+
+       "golang.org/x/net/context"
 )
 
 var log = logger.GetLogger("profiling", "task", "network", "layer7", 
"protocols")
@@ -43,15 +45,16 @@ func init() {
 
 type Analyzer struct {
        ctx       protocol.Context
-       protocols []protocol.Protocol
+       protocols map[base.ConnectionProtocol]*protocol.ProtocolAnalyzer
 }
 
 func NewAnalyzer(ctx protocol.Context, config *profiling.TaskConfig) *Analyzer 
{
-       protocols := make([]protocol.Protocol, 0)
+       protocols := 
make(map[base.ConnectionProtocol]*protocol.ProtocolAnalyzer)
        for _, r := range registerProtocols {
                p := r()
                p.Init(config)
-               protocols = append(protocols, p)
+               analyzer := protocol.NewProtocolAnalyzer(ctx, p, config)
+               protocols[p.Protocol()] = analyzer
        }
        return &Analyzer{
                ctx:       ctx,
@@ -59,14 +62,20 @@ func NewAnalyzer(ctx protocol.Context, config 
*profiling.TaskConfig) *Analyzer {
        }
 }
 
-func (a *Analyzer) ReceiveSocketDataEvent(event 
*protocol.SocketDataUploadEvent) {
+func (a *Analyzer) Start(ctx context.Context) {
        for _, p := range a.protocols {
-               if p.ReceiveData(a.ctx, event) {
-                       return
-               }
+               p.Start(ctx)
+       }
+}
+
+func (a *Analyzer) ReceiveSocketDataEvent(event 
*protocol.SocketDataUploadEvent) {
+       analyzer := a.protocols[event.Protocol]
+       if analyzer == nil {
+               log.Warnf("could not found any protocol to handle socket data, 
connection id: %s, protocol: %s(%d)",
+                       event.GenerateConnectionID(), event.Protocol.String(), 
event.Protocol)
+               return
        }
-       log.Warnf("could not found any protocol to handle socket data, 
connection id: %s, protocol: %s(%d), type: %s",
-               event.GenerateConnectionID(), event.Protocol.String(), 
event.Protocol, event.MsgType.String())
+       analyzer.ReceiveSocketData(a.ctx, event)
 }
 
 func (a *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
@@ -76,19 +85,19 @@ func (a *Analyzer) UpdateExtensionConfig(config 
*profiling.ExtensionConfig) {
 }
 
 type ProtocolMetrics struct {
-       data map[string]protocol.Metrics
+       data map[base.ConnectionProtocol]protocol.Metrics
 }
 
 func NewProtocolMetrics() *ProtocolMetrics {
-       metrics := make(map[string]protocol.Metrics)
+       metrics := make(map[base.ConnectionProtocol]protocol.Metrics)
        for _, p := range defaultInstances {
-               metrics[p.Name()] = p.GenerateMetrics()
+               metrics[p.Protocol()] = p.GenerateMetrics()
        }
        return &ProtocolMetrics{data: metrics}
 }
 
-func (m *ProtocolMetrics) GetProtocolMetrics(name string) protocol.Metrics {
-       return m.data[name]
+func (m *ProtocolMetrics) GetProtocolMetrics(p base.ConnectionProtocol) 
protocol.Metrics {
+       return m.data[p]
 }
 
 func (m *ProtocolMetrics) MergeMetricsFromConnection(connection 
*base.ConnectionContext, data base.ConnectionMetrics) {
diff --git a/pkg/profiling/task/network/analyze/layer7/queue.go 
b/pkg/profiling/task/network/analyze/layer7/queue.go
index 352059d..4c6ffa9 100644
--- a/pkg/profiling/task/network/analyze/layer7/queue.go
+++ b/pkg/profiling/task/network/analyze/layer7/queue.go
@@ -28,6 +28,7 @@ import (
 )
 
 type PartitionContext interface {
+       Start(ctx context.Context)
        Consume(data interface{})
 }
 
@@ -72,8 +73,9 @@ func (e *EventQueue) start0(ctx context.Context, bpfLoader 
*bpf.Loader, emap *eb
        }
 
        for i := 0; i < len(e.partitions); i++ {
-               go func(inx int) {
+               go func(ctx context.Context, inx int) {
                        p := e.partitions[inx]
+                       p.ctx.Start(ctx)
                        for {
                                select {
                                // consume the data
@@ -84,7 +86,7 @@ func (e *EventQueue) start0(ctx context.Context, bpfLoader 
*bpf.Loader, emap *eb
                                        return
                                }
                        }
-               }(i)
+               }(ctx, i)
        }
 }
 
diff --git a/pkg/tools/host/time.go b/pkg/tools/host/time.go
index 7cc8a78..15e22bd 100644
--- a/pkg/tools/host/time.go
+++ b/pkg/tools/host/time.go
@@ -41,10 +41,14 @@ func init() {
 }
 
 func TimeToInstant(bpfTime uint64) *v3.Instant {
-       timeCopy := time.Unix(BootTime.Unix(), int64(BootTime.Nanosecond()))
-       result := timeCopy.Add(time.Duration(bpfTime))
+       result := Time(bpfTime)
        return &v3.Instant{
                Seconds: result.Unix(),
                Nanos:   int32(result.Nanosecond()),
        }
 }
+
+func Time(bpfTime uint64) time.Time {
+       timeCopy := time.Unix(BootTime.Unix(), int64(BootTime.Nanosecond()))
+       return timeCopy.Add(time.Duration(bpfTime))
+}

Reply via email to