This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new fc8d074  Adapt protocol update and fix some continuous profiling bugs 
(#80)
fc8d074 is described below

commit fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
Author: mrproliu <[email protected]>
AuthorDate: Thu Mar 2 13:46:08 2023 +0800

    Adapt protocol update and fix some continuous profiling bugs (#80)
---
 go.mod                                             |  10 +-
 go.sum                                             |  19 +-
 pkg/profiling/continuous/base/metrics.go           |   4 +-
 pkg/profiling/continuous/base/windows.go           |  49 +++--
 pkg/profiling/continuous/base/windows_test.go      | 218 +++++++++++++++++++++
 pkg/profiling/continuous/checker/common/causes.go  |  24 +--
 .../continuous/checker/common/http_checker.go      |  12 +-
 .../continuous/checker/common/process_checker.go   |  10 +-
 .../continuous/checker/common/system_checker.go    |  10 +-
 .../continuous/checker/network_error_rate.go       |   2 +-
 .../continuous/checker/network_response_time.go    |   2 +-
 pkg/profiling/continuous/checker/process_cpu.go    |   2 +-
 pkg/profiling/continuous/checker/process_thread.go |   2 +-
 pkg/profiling/continuous/checker/system_load.go    |   2 +-
 pkg/profiling/continuous/checkers.go               |  19 +-
 15 files changed, 316 insertions(+), 69 deletions(-)

diff --git a/go.mod b/go.mod
index 8696dea..7dea1cc 100644
--- a/go.mod
+++ b/go.mod
@@ -18,14 +18,14 @@ require (
        github.com/stretchr/testify v1.8.1
        github.com/zekroTJA/timedmap v1.4.0
        golang.org/x/arch v0.0.0-20220722155209-00200b7164a7
-       golang.org/x/net v0.0.0-20220722155237-a158d28d115b
-       golang.org/x/sys v0.3.0
+       golang.org/x/net v0.6.0
+       golang.org/x/sys v0.5.0
        google.golang.org/grpc v1.44.0
        k8s.io/api v0.23.5
        k8s.io/apimachinery v0.23.5
        k8s.io/client-go v0.23.5
        k8s.io/utils v0.0.0-20211116205334-6203023598ed
-       skywalking.apache.org/repo/goapi v0.0.0-20230221054914-eeacbf544e9c
+       skywalking.apache.org/repo/goapi v0.0.0-20230301143132-aa3f8469385b
 )
 
 require (
@@ -57,8 +57,8 @@ require (
        github.com/tklauser/numcpus v0.3.0 // indirect
        github.com/yusufpapurcu/wmi v1.2.2 // indirect
        golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
-       golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
-       golang.org/x/text v0.3.8 // indirect
+       golang.org/x/term v0.5.0 // indirect
+       golang.org/x/text v0.7.0 // indirect
        golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
        google.golang.org/appengine v1.6.7 // indirect
        google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // 
indirect
diff --git a/go.sum b/go.sum
index 78b8e05..3a3798e 100644
--- a/go.sum
+++ b/go.sum
@@ -560,8 +560,9 @@ golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod 
h1:9tjilg8BloeKEkVJvy
 golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20220722155237-a158d28d115b 
h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0=
 golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod 
h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod 
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -667,13 +668,13 @@ golang.org/x/sys 
v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
-golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod 
h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
-golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 
h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod 
h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod 
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod 
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -683,8 +684,8 @@ golang.org/x/text v0.3.4/go.mod 
h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
-golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
+golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod 
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod 
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod 
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -963,5 +964,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.1 
h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLz
 sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod 
h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20230221054914-eeacbf544e9c 
h1:UUcBWaN9cUdtYqdj9ssIcL/BuHD4jT17smPgfpZTcVg=
-skywalking.apache.org/repo/goapi v0.0.0-20230221054914-eeacbf544e9c/go.mod 
h1:BS5LRvsAMmZn8YIW9n0+8eiJhC9zVn663fDr5t+cL40=
+skywalking.apache.org/repo/goapi v0.0.0-20230301143132-aa3f8469385b 
h1:VAWQr1mJk4P/a8VZ9UASY8H53wj0zdLHgYvhddyQcXw=
+skywalking.apache.org/repo/goapi v0.0.0-20230301143132-aa3f8469385b/go.mod 
h1:WovoDv1GA+8VuvHPVJL7q/fL0KlYPBZq5rTMCFQRzJU=
diff --git a/pkg/profiling/continuous/base/metrics.go 
b/pkg/profiling/continuous/base/metrics.go
index 6b48015..55029b0 100644
--- a/pkg/profiling/continuous/base/metrics.go
+++ b/pkg/profiling/continuous/base/metrics.go
@@ -45,7 +45,9 @@ func (m *MetricsAppender) AppendProcessSingleValue(name 
string, p api.ProcessInt
        for k, v := range labels {
                transformLabels = append(transformLabels, &v3.Label{Name: k, 
Value: v})
        }
-       transformLabels = append(transformLabels, &v3.Label{Name: 
"process_name", Value: p.Entity().ProcessName})
+       transformLabels = append(transformLabels,
+               &v3.Label{Name: "process_name", Value: p.Entity().ProcessName},
+               &v3.Label{Name: "layer", Value: p.Entity().Layer})
        metadata := serviceInstanceMetadata{
                service:  p.Entity().ServiceName,
                instance: p.Entity().InstanceName,
diff --git a/pkg/profiling/continuous/base/windows.go 
b/pkg/profiling/continuous/base/windows.go
index c400a64..6b9ab49 100644
--- a/pkg/profiling/continuous/base/windows.go
+++ b/pkg/profiling/continuous/base/windows.go
@@ -61,8 +61,8 @@ type TimeWindows[V any, R any] struct {
        windowLocker    sync.RWMutex
        windowGenerator func() WindowData[V, R]
 
-       lastFlushedElement *list.Element
-       lastWriteElement   *list.Element
+       // mark the latest flush endTime
+       lastFlushTime *time.Time
 }
 
 func NewTimeWindows[V any, R any](items []*PolicyItem, generator func() 
WindowData[V, R]) *TimeWindows[V, R] {
@@ -143,7 +143,7 @@ func (t *TimeWindows[D, R]) Add(tm time.Time, val D) {
                second = 0
        }
 
-       if second > t.data.Len() {
+       if second >= t.data.Len() {
                // add the older data, ignore it
                return
        }
@@ -151,29 +151,47 @@ func (t *TimeWindows[D, R]) Add(tm time.Time, val D) {
        t.appendDataToSlot(t.data.Len()-second-1, val)
 }
 
-func (t *TimeWindows[D, R]) FlushLastWriteData() (R, bool) {
-       if t.lastWriteElement == nil || t.lastFlushedElement == 
t.lastWriteElement {
+func (t *TimeWindows[D, R]) FlushMostRecentData() (R, bool) {
+       endTime := t.endTime
+       if !t.shouldFlush(endTime) {
                var empty R
                return empty, false
        }
-       t.lastFlushedElement = t.lastWriteElement
-       return t.lastFlushedElement.Value.(*windowDataWrapper[D, R]).Get(), true
+       t.lastFlushTime = endTime
+       return t.data.Back().Value.(*windowDataWrapper[D, R]).Get(), true
 }
 
-func (t *TimeWindows[D, R]) FlushMultipleWriteData() ([]R, bool) {
-       result := make([]R, 0)
-       if t.lastWriteElement == nil || t.lastFlushedElement == 
t.lastWriteElement {
+func (t *TimeWindows[D, R]) FlushMultipleRecentData() ([]R, bool) {
+       endTime := t.endTime
+       if !t.shouldFlush(endTime) {
                return nil, false
        }
-       for e := t.lastWriteElement; e != t.lastFlushedElement && e != nil; e = 
e.Prev() {
+       result := make([]R, 0)
+       slotCount := t.data.Len()
+       if t.lastFlushTime != nil {
+               slotCount = int(t.endTime.Sub(*t.lastFlushTime).Seconds()) - 1
+       }
+       for e := t.data.Back(); e != nil && slotCount >= 0; e = e.Prev() {
                if e.Value.(*windowDataWrapper[D, R]).hasData {
                        result = append(result, e.Value.(*windowDataWrapper[D, 
R]).Get())
                }
+               slotCount--
        }
-       t.lastFlushedElement = t.lastWriteElement
+       t.lastFlushTime = endTime
        return result, true
 }
 
+func (t *TimeWindows[D, R]) shouldFlush(endTime *time.Time) bool {
+       if endTime == nil {
+               return false
+       }
+       if t.lastFlushTime == nil {
+               return true
+       }
+
+       return t.lastFlushTime != endTime && t.lastFlushTime.Before(*endTime)
+}
+
 func (t *TimeWindows[D, R]) moveTo(tm time.Time) {
        t.windowLocker.Lock()
        defer t.windowLocker.Unlock()
@@ -190,9 +208,9 @@ func (t *TimeWindows[D, R]) moveTo(tm time.Time) {
        } else {
                for i := 0; i < addSeconds; i++ {
                        // remove the older data
-                       first := 
t.data.Remove(t.data.Back()).(*windowDataWrapper[D, R])
+                       first := 
t.data.Remove(t.data.Front()).(*windowDataWrapper[D, R])
                        first.Reset()
-                       t.data.PushFront(first)
+                       t.data.PushBack(first)
                }
        }
        t.endTime = &tm
@@ -202,7 +220,7 @@ func (t *TimeWindows[V, R]) appendDataToSlot(index int, 
data V) {
        t.windowLocker.RLock()
        defer t.windowLocker.RUnlock()
 
-       if index <= 0 || index >= t.data.Len() {
+       if index < 0 || index > t.data.Len() {
                return
        }
 
@@ -223,7 +241,6 @@ func (t *TimeWindows[V, R]) appendDataToSlot(index int, 
data V) {
        }
 
        element.Value.(*windowDataWrapper[V, R]).Accept(data)
-       t.lastWriteElement = element
 }
 
 type windowDataWrapper[D any, R any] struct {
diff --git a/pkg/profiling/continuous/base/windows_test.go 
b/pkg/profiling/continuous/base/windows_test.go
new file mode 100644
index 0000000..3a5c335
--- /dev/null
+++ b/pkg/profiling/continuous/base/windows_test.go
@@ -0,0 +1,218 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+       "reflect"
+       "testing"
+       "time"
+)
+
+var (
+       defaultTime = time.Now()
+)
+
+func TestAdd(t *testing.T) {
+       tests := []struct {
+               name   string
+               size   int
+               dataOp func(window *TimeWindows[float64, float64])
+               result []float64
+       }{
+               {
+                       name: "normal",
+                       size: 3,
+                       dataOp: func(window *TimeWindows[float64, float64]) {
+                               addData(window, 1, 1)
+                               addData(window, 2, 2)
+                               addData(window, 3, 3)
+                       },
+                       result: []float64{1, 2, 3},
+               },
+               {
+                       name: "out of count",
+                       size: 3,
+                       dataOp: func(window *TimeWindows[float64, float64]) {
+                               addData(window, 1, 1)
+                               addData(window, 2, 2)
+                               addData(window, 3, 3)
+                               addData(window, 4, 4)
+                       },
+                       result: []float64{2, 3, 4},
+               },
+               {
+                       name: "add older data",
+                       size: 3,
+                       dataOp: func(window *TimeWindows[float64, float64]) {
+                               addData(window, 1, 1)
+                               addData(window, 2, 2)
+                               addData(window, 3, 3)
+                               addData(window, 4, 4)
+                               addData(window, 2, 4)
+                               addData(window, 1, 8)
+                               addData(window, 0, 10)
+                       },
+                       result: []float64{4, 3, 4},
+               },
+               {
+                       name: "add new data which bigger than windows count",
+                       size: 3,
+                       dataOp: func(window *TimeWindows[float64, float64]) {
+                               addData(window, 1, 1)
+                               addData(window, 4, 2)
+                               addData(window, 5, 3)
+                               addData(window, 7, 5)
+                       },
+                       result: []float64{3, 0, 5},
+               },
+       }
+
+       for _, tt := range tests {
+               windows := timeWindows(tt.size)
+               tt.dataOp(windows)
+               actualValue := getAllValues(windows)
+               if !reflect.DeepEqual(tt.result, actualValue) {
+                       t.Fatalf("test [%s] failure, expceted: %v, actual: %v", 
tt.name, tt.result, actualValue)
+               }
+       }
+}
+
+func TestFlushData(t *testing.T) {
+       tests := []struct {
+               name       string
+               size       int
+               operations []interface{}
+       }{
+               {
+                       name: "normal[most recent]",
+                       size: 3,
+                       operations: []interface{}{
+                               appendDataOperation{1, 1},
+                               mostRecentDataChecker{1, true},
+                               mostRecentDataChecker{0, false},
+
+                               appendDataOperation{2, 2},
+                               appendDataOperation{3, 3},
+                               mostRecentDataChecker{3, true},
+                               mostRecentDataChecker{0, false},
+                       },
+               },
+               {
+                       name: "has older data[most recent]",
+                       size: 3,
+                       operations: []interface{}{
+                               appendDataOperation{1, 1},
+                               appendDataOperation{2, 2},
+                               appendDataOperation{1, 3},
+                               mostRecentDataChecker{2, true},
+                               mostRecentDataChecker{0, false},
+
+                               appendDataOperation{3, 3},
+                               appendDataOperation{4, 4},
+                               appendDataOperation{1, 9},
+                               mostRecentDataChecker{4, true},
+                               mostRecentDataChecker{0, false},
+                       },
+               },
+               {
+                       name: "normal[multiple recent]",
+                       size: 3,
+                       operations: []interface{}{
+                               appendDataOperation{1, 1},
+                               appendDataOperation{2, 2},
+                               appendDataOperation{3, 3},
+                               multipleRecentDataChecker{[]float64{3, 2, 1}, 
true},
+                               multipleRecentDataChecker{nil, false},
+
+                               appendDataOperation{4, 4},
+                               multipleRecentDataChecker{[]float64{4}, true},
+                               multipleRecentDataChecker{nil, false},
+                       },
+               },
+               {
+                       name: "has older data[multiple recent]",
+                       size: 3,
+                       operations: []interface{}{
+                               appendDataOperation{1, 1},
+                               appendDataOperation{2, 2},
+                               appendDataOperation{4, 4},
+                               multipleRecentDataChecker{[]float64{4, 2}, 
true},
+                               multipleRecentDataChecker{nil, false},
+
+                               appendDataOperation{1, 1},
+                               multipleRecentDataChecker{nil, false},
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               windows := timeWindows(tt.size)
+               for _, op := range tt.operations {
+                       switch v := op.(type) {
+                       case appendDataOperation:
+                               addData(windows, v.second, v.value)
+                       case mostRecentDataChecker:
+                               val, hasData := windows.FlushMostRecentData()
+                               if val != v.value || v.hasData != hasData {
+                                       t.Fatalf("test[%s] failure, excepted: 
%v-%t, actual: %v-%t", tt.name,
+                                               v.value, v.hasData, val, 
hasData)
+                               }
+                       case multipleRecentDataChecker:
+                               val, hasData := 
windows.FlushMultipleRecentData()
+                               if !reflect.DeepEqual(val, v.value) || 
v.hasData != hasData {
+                                       t.Fatalf("test[%s] failure, excepted: 
%v-%t, actual: %v-%t", tt.name,
+                                               v.value, v.hasData, val, 
hasData)
+                               }
+                       }
+               }
+       }
+}
+
+type appendDataOperation struct {
+       second int
+       value  float64
+}
+
+type mostRecentDataChecker struct {
+       value   float64
+       hasData bool
+}
+
+type multipleRecentDataChecker struct {
+       value   []float64
+       hasData bool
+}
+
+func addData(win *TimeWindows[float64, float64], second int, val float64) {
+       result := defaultTime.Add(time.Second * time.Duration(second))
+       win.Add(result, val)
+}
+
+func timeWindows(count int) *TimeWindows[float64, float64] {
+       return NewTimeWindows([]*PolicyItem{{Period: count}}, func() 
WindowData[float64, float64] {
+               return NewLatestWindowData[float64]()
+       })
+}
+
+func getAllValues(win *TimeWindows[float64, float64]) []float64 {
+       result := make([]float64, 0)
+       for e := win.data.Front(); e != nil; e = e.Next() {
+               result = append(result, e.Value.(*windowDataWrapper[float64, 
float64]).Get())
+       }
+       return result
+}
diff --git a/pkg/profiling/continuous/checker/common/causes.go 
b/pkg/profiling/continuous/checker/common/causes.go
index 8955569..e9a1a70 100644
--- a/pkg/profiling/continuous/checker/common/causes.go
+++ b/pkg/profiling/continuous/checker/common/causes.go
@@ -27,18 +27,18 @@ import (
 type SingleValueCause struct {
        process            api.ProcessInterface
        policy             *base.PolicyItem
-       causeType          v3.ContinuousProfilingCauseType
+       monitorType        v3.ContinuousProfilingTriggeredMonitorType
        threshold, current float64
 }
 
-func NewSingleValueCause(p api.ProcessInterface, policyItem *base.PolicyItem, 
causeType v3.ContinuousProfilingCauseType,
+func NewSingleValueCause(p api.ProcessInterface, policyItem *base.PolicyItem, 
monitorType v3.ContinuousProfilingTriggeredMonitorType,
        threshold, current float64) *SingleValueCause {
        return &SingleValueCause{
-               process:   p,
-               policy:    policyItem,
-               causeType: causeType,
-               threshold: threshold,
-               current:   current,
+               process:     p,
+               policy:      policyItem,
+               monitorType: monitorType,
+               threshold:   threshold,
+               current:     current,
        }
 }
 
@@ -52,7 +52,7 @@ func (p *SingleValueCause) FromPolicy() *base.PolicyItem {
 
 func (p *SingleValueCause) GenerateTransferCause() 
*v3.ContinuousProfilingCause {
        return &v3.ContinuousProfilingCause{
-               Type: p.causeType,
+               Type: p.monitorType,
                Cause: &v3.ContinuousProfilingCause_SingleValue{
                        SingleValue: &v3.ContinuousProfilingSingleValueCause{
                                Threshold: p.threshold,
@@ -68,16 +68,16 @@ type URICause struct {
 
        process            api.ProcessInterface
        policy             *base.PolicyItem
-       causeType          v3.ContinuousProfilingCauseType
+       causeType          v3.ContinuousProfilingTriggeredMonitorType
        threshold, current float64
 }
 
-func NewURICause(p api.ProcessInterface, isRegex bool, uri string, policyItem 
*base.PolicyItem, causeType v3.ContinuousProfilingCauseType,
-       threshold, current float64) *URICause {
+func NewURICause(p api.ProcessInterface, isRegex bool, uri string, policyItem 
*base.PolicyItem,
+       monitorType v3.ContinuousProfilingTriggeredMonitorType, threshold, 
current float64) *URICause {
        return &URICause{
                process:   p,
                policy:    policyItem,
-               causeType: causeType,
+               causeType: monitorType,
                IsRegex:   isRegex,
                URI:       uri,
                threshold: threshold,
diff --git a/pkg/profiling/continuous/checker/common/http_checker.go 
b/pkg/profiling/continuous/checker/common/http_checker.go
index 383f5ca..3952787 100644
--- a/pkg/profiling/continuous/checker/common/http_checker.go
+++ b/pkg/profiling/continuous/checker/common/http_checker.go
@@ -32,17 +32,17 @@ type HTTPBasedChecker[Data 
base.WindowData[network.BufferEvent, float64]] struct
        *BaseChecker[*HTTPBasedCheckerProcessInfo]
 
        CheckType         base.CheckType
-       CauseType         v3.ContinuousProfilingCauseType
+       MonitorType       v3.ContinuousProfilingTriggeredMonitorType
        ThresholdGenerate func(val string) (float64, error)
 }
 
 func NewHTTPBasedChecker[Data base.WindowData[network.BufferEvent, 
float64]](checkType base.CheckType,
        thresholdGenerator func(val string) (float64, error), dataGenerator 
func() base.WindowData[network.BufferEvent, float64],
-       causeType v3.ContinuousProfilingCauseType) *HTTPBasedChecker[Data] {
+       monitorType v3.ContinuousProfilingTriggeredMonitorType) 
*HTTPBasedChecker[Data] {
        checker := &HTTPBasedChecker[Data]{
                CheckType:         checkType,
                ThresholdGenerate: thresholdGenerator,
-               CauseType:         causeType,
+               MonitorType:       monitorType,
        }
        checker.BaseChecker = NewBaseChecker[*HTTPBasedCheckerProcessInfo](
                func(p api.ProcessInterface, older 
*HTTPBasedCheckerProcessInfo, items []*base.PolicyItem) 
*HTTPBasedCheckerProcessInfo {
@@ -204,7 +204,7 @@ func (n *HTTPBasedChecker[Data]) Check(ctx 
base.CheckContext, metricsAppender *b
                                        return val >= itemInfo.threshold
                                }); isMatch {
                                        causes = append(causes, 
NewURICause(pidPolicies.Process, false, uri, item,
-                                               n.CauseType, 
itemInfo.threshold, lastMatch))
+                                               n.MonitorType, 
itemInfo.threshold, lastMatch))
                                }
                        }
 
@@ -213,7 +213,7 @@ func (n *HTTPBasedChecker[Data]) Check(ctx 
base.CheckContext, metricsAppender *b
                                return val >= itemInfo.threshold
                        }); isMatch {
                                causes = append(causes, 
NewURICause(pidPolicies.Process, itemInfo.uriRegex != nil, globalURI, item,
-                                       n.CauseType, itemInfo.threshold, 
lastMatch))
+                                       n.MonitorType, itemInfo.threshold, 
lastMatch))
                        }
                }
        }
@@ -225,7 +225,7 @@ func (n *HTTPBasedChecker[Data]) flushMetrics(uri string, 
windows *base.TimeWind
        if uri == "" {
                uri = "global"
        }
-       if data, hasUpdate := windows.FlushMultipleWriteData(); hasUpdate {
+       if data, hasUpdate := windows.FlushMultipleRecentData(); hasUpdate {
                // flush each slot data
                for _, d := range data {
                        
metricsAppender.AppendProcessSingleValue(strings.ToLower(string(n.CheckType)), 
process, map[string]string{
diff --git a/pkg/profiling/continuous/checker/common/process_checker.go 
b/pkg/profiling/continuous/checker/common/process_checker.go
index ad1f42e..8298506 100644
--- a/pkg/profiling/continuous/checker/common/process_checker.go
+++ b/pkg/profiling/continuous/checker/common/process_checker.go
@@ -34,16 +34,16 @@ type ProcessBasedChecker[V numbers] struct {
        *BaseChecker[*ProcessBasedInfo[V]]
 
        CheckType         base.CheckType
-       CauseType         v3.ContinuousProfilingCauseType
+       MonitorType       v3.ContinuousProfilingTriggeredMonitorType
        ThresholdGenerate func(val string) (V, error)
        DataGenerate      func(process api.ProcessInterface) (V, error)
 }
 
 func NewProcessBasedChecker[V numbers](checkType base.CheckType, 
thresholdGenerator func(val string) (V, error),
-       dataGenerator func(p api.ProcessInterface) (V, error), causeType 
v3.ContinuousProfilingCauseType) *ProcessBasedChecker[V] {
+       dataGenerator func(p api.ProcessInterface) (V, error), monitorType 
v3.ContinuousProfilingTriggeredMonitorType) *ProcessBasedChecker[V] {
        checker := &ProcessBasedChecker[V]{
                CheckType:         checkType,
-               CauseType:         causeType,
+               MonitorType:       monitorType,
                ThresholdGenerate: thresholdGenerator,
                DataGenerate:      dataGenerator,
        }
@@ -118,7 +118,7 @@ func (r *ProcessBasedChecker[V]) Check(ctx 
base.CheckContext, metricsAppender *b
        causes := make([]base.ThresholdCause, 0)
        for _, info := range r.PidWithInfos {
                for _, threshold := range info.Policies {
-                       if data, hasData := info.Windows.FlushLastWriteData(); 
hasData {
+                       if data, hasData := info.Windows.FlushMostRecentData(); 
hasData {
                                
metricsAppender.AppendProcessSingleValue(strings.ToLower(string(r.CheckType)), 
info.Process, nil, float64(data))
                        }
                        if !ctx.ShouldCheck(info.Process, threshold.Policy) {
@@ -129,7 +129,7 @@ func (r *ProcessBasedChecker[V]) Check(ctx 
base.CheckContext, metricsAppender *b
                                return val >= threshold.Threshold
                        }); enable {
                                causes = append(causes,
-                                       NewSingleValueCause(info.Process, 
threshold.Policy, r.CauseType, float64(threshold.Threshold), 
float64(lastMatch)))
+                                       NewSingleValueCause(info.Process, 
threshold.Policy, r.MonitorType, float64(threshold.Threshold), 
float64(lastMatch)))
                        }
                }
        }
diff --git a/pkg/profiling/continuous/checker/common/system_checker.go 
b/pkg/profiling/continuous/checker/common/system_checker.go
index 5cc4e60..1b9475f 100644
--- a/pkg/profiling/continuous/checker/common/system_checker.go
+++ b/pkg/profiling/continuous/checker/common/system_checker.go
@@ -30,7 +30,7 @@ import (
 
 type SystemBasedChecker[V numbers] struct {
        CheckType         base.CheckType
-       CauseType         v3.ContinuousProfilingCauseType
+       MonitorType       v3.ContinuousProfilingTriggeredMonitorType
        ThresholdGenerate func(val string) (V, error)
        DataGenerate      func() (V, error)
        GlobalWindows     *base.TimeWindows[V, V]
@@ -39,10 +39,10 @@ type SystemBasedChecker[V numbers] struct {
 }
 
 func NewSystemBasedChecker[V numbers](checkType base.CheckType, 
thresholdGenerator func(val string) (V, error),
-       dataGenerator func() (V, error), causeType 
v3.ContinuousProfilingCauseType) *SystemBasedChecker[V] {
+       dataGenerator func() (V, error), monitorType 
v3.ContinuousProfilingTriggeredMonitorType) *SystemBasedChecker[V] {
        checker := &SystemBasedChecker[V]{
                CheckType:         checkType,
-               CauseType:         causeType,
+               MonitorType:       monitorType,
                ThresholdGenerate: thresholdGenerator,
                DataGenerate:      dataGenerator,
                GlobalWindows: base.NewTimeWindows[V, V](nil, func() 
base.WindowData[V, V] {
@@ -100,7 +100,7 @@ func (s *SystemBasedChecker[V]) Check(ctx 
base.CheckContext, metricsAppender *ba
        }
 
        causes := make([]base.ThresholdCause, 0)
-       data, hasData := s.GlobalWindows.FlushLastWriteData()
+       data, hasData := s.GlobalWindows.FlushMostRecentData()
 
        for _, policy := range s.Policies {
                if hasData {
@@ -120,7 +120,7 @@ func (s *SystemBasedChecker[V]) Check(ctx 
base.CheckContext, metricsAppender *ba
                                continue
                        }
 
-                       causes = append(causes, NewSingleValueCause(p, 
policy.Policy, s.CauseType, float64(policy.Threshold), float64(lastMatch)))
+                       causes = append(causes, NewSingleValueCause(p, 
policy.Policy, s.MonitorType, float64(policy.Threshold), float64(lastMatch)))
                }
        }
 
diff --git a/pkg/profiling/continuous/checker/network_error_rate.go 
b/pkg/profiling/continuous/checker/network_error_rate.go
index 7ba08c9..0a43695 100644
--- a/pkg/profiling/continuous/checker/network_error_rate.go
+++ b/pkg/profiling/continuous/checker/network_error_rate.go
@@ -45,7 +45,7 @@ func (n *NetworkHTTPErrorRateChecker) Init(config 
*base.ContinuousConfig) error
                        return v, nil
                }, func() base.WindowData[network.BufferEvent, float64] {
                        return &processNetworkResponseErrorStatics{}
-               }, v3.ContinuousProfilingCauseType_HTTPErrorRate)
+               }, v3.ContinuousProfilingTriggeredMonitorType_HTTPErrorRate)
        return nil
 }
 
diff --git a/pkg/profiling/continuous/checker/network_response_time.go 
b/pkg/profiling/continuous/checker/network_response_time.go
index 7dd1754..a4ef34b 100644
--- a/pkg/profiling/continuous/checker/network_response_time.go
+++ b/pkg/profiling/continuous/checker/network_response_time.go
@@ -41,7 +41,7 @@ func (n *NetworkHTTPAvgResponseTimeChecker) Init(config 
*base.ContinuousConfig)
                        return strconv.ParseFloat(val, 64)
                }, func() base.WindowData[network.BufferEvent, float64] {
                        return &processNetworkAvgResponseTimeStatics{}
-               }, v3.ContinuousProfilingCauseType_HTTPAvgResponseTime)
+               }, 
v3.ContinuousProfilingTriggeredMonitorType_HTTPAvgResponseTime)
        return nil
 }
 
diff --git a/pkg/profiling/continuous/checker/process_cpu.go 
b/pkg/profiling/continuous/checker/process_cpu.go
index f0f226e..ef7de27 100644
--- a/pkg/profiling/continuous/checker/process_cpu.go
+++ b/pkg/profiling/continuous/checker/process_cpu.go
@@ -48,6 +48,6 @@ func (r *ProcessCPUChecker) Init(config 
*base.ContinuousConfig) error {
                        return 0, err
                }
                return percent * 100, nil
-       }, v3.ContinuousProfilingCauseType_ProcessCPU)
+       }, v3.ContinuousProfilingTriggeredMonitorType_ProcessCPU)
        return nil
 }
diff --git a/pkg/profiling/continuous/checker/process_thread.go 
b/pkg/profiling/continuous/checker/process_thread.go
index 566abe2..2e0ab38 100644
--- a/pkg/profiling/continuous/checker/process_thread.go
+++ b/pkg/profiling/continuous/checker/process_thread.go
@@ -42,6 +42,6 @@ func (t *ProcessThreadCountChecker) Init(config 
*base.ContinuousConfig) error {
        }, func(p api.ProcessInterface) (int32, error) {
                threads, err := p.OriginalProcess().NumThreads()
                return threads, err
-       }, v3.ContinuousProfilingCauseType_ProcessThreadCount)
+       }, v3.ContinuousProfilingTriggeredMonitorType_ProcessThreadCount)
        return nil
 }
diff --git a/pkg/profiling/continuous/checker/system_load.go 
b/pkg/profiling/continuous/checker/system_load.go
index f77b949..689de5a 100644
--- a/pkg/profiling/continuous/checker/system_load.go
+++ b/pkg/profiling/continuous/checker/system_load.go
@@ -46,6 +46,6 @@ func (s *SystemLoadChecker) Init(config 
*base.ContinuousConfig) error {
                                return 0, err
                        }
                        return avg.Load1, nil
-               }, v3.ContinuousProfilingCauseType_SystemLoad)
+               }, v3.ContinuousProfilingTriggeredMonitorType_SystemLoad)
        return nil
 }
diff --git a/pkg/profiling/continuous/checkers.go 
b/pkg/profiling/continuous/checkers.go
index 1e4cc05..45dd12f 100644
--- a/pkg/profiling/continuous/checkers.go
+++ b/pkg/profiling/continuous/checkers.go
@@ -286,15 +286,24 @@ func (c *Checkers) queryPolicyUpdates(servicePolicies 
map[string]string) (map[st
        if err != nil {
                return nil, err
        }
+       // no update
+       if len(policyUpdateCommands.GetCommands()) == 0 {
+               return nil, nil
+       }
 
-       if len(policyUpdateCommands.GetCommands()) != 1 ||
-               policyUpdateCommands.GetCommands()[0].GetCommand() != 
"ContinuousProfilingPolicyQuery" ||
-               len(policyUpdateCommands.GetCommands()[0].GetArgs()) != 1 ||
-               policyUpdateCommands.GetCommands()[0].GetArgs()[0].GetKey() != 
"ServiceWithPolicyJSON" {
+       var policyJSON string
+       if len(policyUpdateCommands.GetCommands()) == 1 && 
policyUpdateCommands.GetCommands()[0].GetCommand() == 
"ContinuousProfilingPolicyQuery" {
+               for _, arg := range 
policyUpdateCommands.GetCommands()[0].GetArgs() {
+                       if arg.GetKey() == "ServiceWithPolicyJSON" {
+                               policyJSON = arg.GetValue()
+                               break
+                       }
+               }
+       }
+       if policyJSON == "" {
                return nil, fmt.Errorf("the query policy response not adapt")
        }
 
-       policyJSON := 
policyUpdateCommands.GetCommands()[0].GetArgs()[0].GetValue()
        updates := make([]*QueryPolicyUpdate, 0)
        err = json.Unmarshal([]byte(policyJSON), &updates)
        if err != nil {

Reply via email to