This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new 8c95837ed feat(metrics): enhance the monitoring metrics of rpc call
failure under Triple protocol (#3189)
8c95837ed is described below
commit 8c95837edbdd3700ca2dc4bc6c330fca2dc01c86
Author: LunaRain_079 <[email protected]>
AuthorDate: Sun Mar 1 20:46:20 2026 +0800
feat(metrics): enhance the monitoring metrics of rpc call failure under
Triple protocol (#3189)
* feat(metrics): enhance error classification and granular metrics for
failed requests
* feat(metrics): add network failure and codec error classifications
* test(event): improve end time assertions for event tests
* feat(metrics): add aggregate metrics for failed requests by type
* chore(collector): fix import format
* test(event): correct assertions for event end time checks
* test(metric): add tests for error_classifier.go
* test(metric): fix golangci-lint errors
* refactor(metrics): refactor error handling in incRequestsFailedByType
method
* refactor(metrics): update error type constants for clarity and consistency
---
metrics/registry/event_test.go | 6 +-
metrics/rpc/collector.go | 36 ++++++-
metrics/rpc/error_classifier.go | 68 ++++++++++++
metrics/rpc/error_classifier_test.go | 200 +++++++++++++++++++++++++++++++++++
metrics/rpc/metric_set.go | 36 +++++++
5 files changed, 342 insertions(+), 4 deletions(-)
diff --git a/metrics/registry/event_test.go b/metrics/registry/event_test.go
index d244a7913..fdd09ef97 100644
--- a/metrics/registry/event_test.go
+++ b/metrics/registry/event_test.go
@@ -61,7 +61,7 @@ func TestNewRegisterEvent(t *testing.T) {
assert.Equal(t, Reg, event.Name)
assert.True(t, event.Succ)
assert.Equal(t, start, event.Start)
- assert.True(t, event.End.After(start))
+ assert.False(t, event.End.Before(start))
}
func TestNewSubscribeEvent(t *testing.T) {
@@ -79,7 +79,7 @@ func TestNewNotifyEvent(t *testing.T) {
assert.NotNil(t, event)
assert.Equal(t, Notify, event.Name)
assert.Equal(t, start, event.Start)
- assert.True(t, event.End.After(start))
+ assert.False(t, event.End.Before(start))
}
func TestNewDirectoryEvent(t *testing.T) {
@@ -99,7 +99,7 @@ func TestNewServerRegisterEvent(t *testing.T) {
assert.Equal(t, ServerReg, event.Name)
assert.True(t, event.Succ)
assert.Equal(t, start, event.Start)
- assert.True(t, event.End.After(start))
+ assert.False(t, event.End.Before(start))
}
func TestNewServerSubscribeEvent(t *testing.T) {
diff --git a/metrics/rpc/collector.go b/metrics/rpc/collector.go
index 08881382c..c75733172 100644
--- a/metrics/rpc/collector.go
+++ b/metrics/rpc/collector.go
@@ -96,8 +96,11 @@ func (c *rpcCollector) afterInvokeHandler(event
*metricsEvent) {
if event.result.Error() == nil {
c.incRequestsSucceedTotal(role, labels)
} else {
- // TODO: Breaking down RPC exceptions further
+ // Increment total failed count
c.incRequestsFailedTotal(role, labels)
+ // Classify and increment granular error metrics
+ errType := classifyError(event.result.Error())
+ c.incRequestsFailedByType(role, labels, errType)
}
}
c.reportRTMilliseconds(role, labels, event.costTime.Milliseconds())
@@ -163,6 +166,37 @@ func (c *rpcCollector) incRequestsFailedTotal(role string,
labels map[string]str
}
}
+func (c *rpcCollector) incRequestsFailedByType(role string, labels
map[string]string, errType ErrorType) {
+ var ms *rpcCommonMetrics
+
+ switch role {
+ case constant.SideProvider:
+ ms = &c.metricSet.provider.rpcCommonMetrics
+ case constant.SideConsumer:
+ ms = &c.metricSet.consumer.rpcCommonMetrics
+ default:
+ return
+ }
+
+ switch errType {
+ case ErrorTypeTimeout:
+ ms.requestsTimeoutTotal.Inc(labels)
+ ms.requestsTimeoutTotalAggregate.Inc(labels)
+ case ErrorTypeLimit:
+ ms.requestsLimitTotal.Inc(labels)
+ ms.requestsLimitTotalAggregate.Inc(labels)
+ case ErrorTypeServiceUnavailable:
+ ms.requestsServiceUnavailableTotal.Inc(labels)
+ ms.requestsServiceUnavailableTotalAggregate.Inc(labels)
+ case ErrorTypeBusinessFailed:
+ ms.requestsBusinessFailedTotal.Inc(labels)
+ ms.requestsBusinessFailedTotalAggregate.Inc(labels)
+ default:
+ ms.requestsUnknownFailedTotal.Inc(labels)
+ ms.requestsUnknownFailedTotalAggregate.Inc(labels)
+ }
+}
+
func (c *rpcCollector) reportRTMilliseconds(role string, labels
map[string]string, cost int64) {
switch role {
case constant.SideProvider:
diff --git a/metrics/rpc/error_classifier.go b/metrics/rpc/error_classifier.go
new file mode 100644
index 000000000..66153f68b
--- /dev/null
+++ b/metrics/rpc/error_classifier.go
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rpc
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
+)
+
+// ErrorType represents the classification of RPC errors
+type ErrorType uint8
+
+const (
+ // ErrorTypeUnknown is for unknown or unclassified errors
+ ErrorTypeUnknown ErrorType = 0
+ // ErrorTypeTimeout is for timeout exceptions (CodeDeadlineExceeded)
+ ErrorTypeTimeout ErrorType = 1
+ // ErrorTypeLimit is for rate limit exceeded exceptions
(CodeResourceExhausted)
+ ErrorTypeLimit ErrorType = 2
+ // ErrorTypeServiceUnavailable is for service unavailable exceptions
(CodeUnavailable, CodePermissionDenied)
+ ErrorTypeServiceUnavailable ErrorType = 3
+ // ErrorTypeBusinessFailed is for business logic exceptions
(CodeBizError)
+ ErrorTypeBusinessFailed ErrorType = 4
+ // ErrorTypeNetworkFailure is for network failure exceptions
(CodeInternal)
+ // TODO: At present, this error type has not been produced. If
available, please map the appropriate internal/network error code to this type.
+ ErrorTypeNetworkFailure ErrorType = 5
+ // ErrorTypeCodec is for codec errors (CodeInternal)
+ // TODO: At present, this error type has not been produced. If
available, please map the appropriate internal/codec error code to this type.
+ ErrorTypeCodec ErrorType = 6
+)
+
+// classifyError classifies an error based on triple protocol error codes.
+// This function supports triple and gRPC protocol errors.
+func classifyError(err error) ErrorType {
+ if err == nil {
+ return ErrorTypeUnknown
+ }
+ // TODO: Support dubbo protocol error classification
+ // Get the error code from triple protocol error
+ code := triple_protocol.CodeOf(err)
+
+ switch code {
+ case triple_protocol.CodeDeadlineExceeded:
+ return ErrorTypeTimeout
+ case triple_protocol.CodeResourceExhausted:
+ return ErrorTypeLimit
+ case triple_protocol.CodeUnavailable,
triple_protocol.CodePermissionDenied:
+ return ErrorTypeServiceUnavailable
+ case triple_protocol.CodeBizError:
+ return ErrorTypeBusinessFailed
+ default:
+ return ErrorTypeUnknown
+ }
+}
diff --git a/metrics/rpc/error_classifier_test.go
b/metrics/rpc/error_classifier_test.go
new file mode 100644
index 000000000..378991ba0
--- /dev/null
+++ b/metrics/rpc/error_classifier_test.go
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rpc
+
+import (
+ "errors"
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
+)
+
+func TestClassifyError_Timeout(t *testing.T) {
+ err := triple_protocol.NewError(triple_protocol.CodeDeadlineExceeded,
errors.New("timeout"))
+ errType := classifyError(err)
+ assert.Equal(t, ErrorTypeTimeout, errType)
+}
+
+func TestClassifyError_Limit(t *testing.T) {
+ err := triple_protocol.NewError(triple_protocol.CodeResourceExhausted,
errors.New("limit exceeded"))
+ errType := classifyError(err)
+ assert.Equal(t, ErrorTypeLimit, errType)
+}
+
+func TestClassifyError_ServiceUnavailable(t *testing.T) {
+ tests := []struct {
+ name string
+ code triple_protocol.Code
+ }{
+ {
+ name: "CodeUnavailable",
+ code: triple_protocol.CodeUnavailable,
+ },
+ {
+ name: "CodePermissionDenied",
+ code: triple_protocol.CodePermissionDenied,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ err := triple_protocol.NewError(tt.code,
errors.New("service unavailable"))
+ errType := classifyError(err)
+ assert.Equal(t, ErrorTypeServiceUnavailable, errType)
+ })
+ }
+}
+
+func TestClassifyError_BusinessFailed(t *testing.T) {
+ err := triple_protocol.NewError(triple_protocol.CodeBizError,
errors.New("business error"))
+ errType := classifyError(err)
+ assert.Equal(t, ErrorTypeBusinessFailed, errType)
+}
+
+func TestClassifyError_Unknown(t *testing.T) {
+ tests := []struct {
+ name string
+ err error
+ }{
+ {
+ name: "standard error",
+ err: errors.New("some error"),
+ },
+ {
+ name: "CodeUnknown",
+ err:
triple_protocol.NewError(triple_protocol.CodeUnknown, errors.New("unknown
error")),
+ },
+ {
+ name: "CodeCanceled",
+ err:
triple_protocol.NewError(triple_protocol.CodeCanceled, errors.New("canceled")),
+ },
+ {
+ name: "CodeInvalidArgument",
+ err:
triple_protocol.NewError(triple_protocol.CodeInvalidArgument,
errors.New("invalid argument")),
+ },
+ {
+ name: "CodeNotFound",
+ err:
triple_protocol.NewError(triple_protocol.CodeNotFound, errors.New("not found")),
+ },
+ {
+ name: "CodeAlreadyExists",
+ err:
triple_protocol.NewError(triple_protocol.CodeAlreadyExists, errors.New("already
exists")),
+ },
+ {
+ name: "CodeAborted",
+ err:
triple_protocol.NewError(triple_protocol.CodeAborted, errors.New("aborted")),
+ },
+ {
+ name: "CodeOutOfRange",
+ err:
triple_protocol.NewError(triple_protocol.CodeOutOfRange, errors.New("out of
range")),
+ },
+ {
+ name: "CodeUnimplemented",
+ err:
triple_protocol.NewError(triple_protocol.CodeUnimplemented,
errors.New("unimplemented")),
+ },
+ {
+ name: "CodeInternal",
+ err:
triple_protocol.NewError(triple_protocol.CodeInternal, errors.New("internal
error")),
+ },
+ {
+ name: "CodeDataLoss",
+ err:
triple_protocol.NewError(triple_protocol.CodeDataLoss, errors.New("data loss")),
+ },
+ {
+ name: "CodeUnauthenticated",
+ err:
triple_protocol.NewError(triple_protocol.CodeUnauthenticated,
errors.New("unauthenticated")),
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ errType := classifyError(tt.err)
+ assert.Equal(t, ErrorTypeUnknown, errType)
+ })
+ }
+}
+
+func TestErrorType_Values(t *testing.T) {
+ // Verify ErrorType constants have expected values
+ assert.Equal(t, ErrorTypeUnknown, ErrorType(0))
+ assert.Equal(t, ErrorTypeTimeout, ErrorType(1))
+ assert.Equal(t, ErrorTypeLimit, ErrorType(2))
+ assert.Equal(t, ErrorTypeServiceUnavailable, ErrorType(3))
+ assert.Equal(t, ErrorTypeBusinessFailed, ErrorType(4))
+ assert.Equal(t, ErrorTypeNetworkFailure, ErrorType(5))
+ assert.Equal(t, ErrorTypeCodec, ErrorType(6))
+}
+
+func TestClassifyError_AllErrorTypesClassification(t *testing.T) {
+ // Test that we can classify into all defined error types
+ // (except ErrorTypeNetworkFailure and ErrorTypeCodec which are not yet
used in classifyError)
+ tests := []struct {
+ name string
+ err error
+ expected ErrorType
+ }{
+ {
+ name: "timeout",
+ err:
triple_protocol.NewError(triple_protocol.CodeDeadlineExceeded,
errors.New("timeout")),
+ expected: ErrorTypeTimeout,
+ },
+ {
+ name: "limit",
+ err:
triple_protocol.NewError(triple_protocol.CodeResourceExhausted,
errors.New("limit")),
+ expected: ErrorTypeLimit,
+ },
+ {
+ name: "unavailable",
+ err:
triple_protocol.NewError(triple_protocol.CodeUnavailable,
errors.New("unavailable")),
+ expected: ErrorTypeServiceUnavailable,
+ },
+ {
+ name: "permission denied",
+ err:
triple_protocol.NewError(triple_protocol.CodePermissionDenied,
errors.New("denied")),
+ expected: ErrorTypeServiceUnavailable,
+ },
+ {
+ name: "business failed",
+ err:
triple_protocol.NewError(triple_protocol.CodeBizError, errors.New("biz error")),
+ expected: ErrorTypeBusinessFailed,
+ },
+ {
+ name: "nil error",
+ err: nil,
+ expected: ErrorTypeUnknown,
+ },
+ {
+ name: "unknown error",
+ err: errors.New("unknown"),
+ expected: ErrorTypeUnknown,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ errType := classifyError(tt.err)
+ assert.Equal(t, tt.expected, errType)
+ })
+ }
+}
diff --git a/metrics/rpc/metric_set.go b/metrics/rpc/metric_set.go
index aa349467c..7fb431d84 100644
--- a/metrics/rpc/metric_set.go
+++ b/metrics/rpc/metric_set.go
@@ -48,6 +48,18 @@ type rpcCommonMetrics struct {
rtMilliseconds metrics.RtVec
rtMillisecondsQuantiles metrics.QuantileMetricVec
rtMillisecondsAggregate metrics.RtVec
+
+ // Granular error metrics
+ requestsTimeoutTotal metrics.CounterVec
+ requestsTimeoutTotalAggregate metrics.AggregateCounterVec
+ requestsLimitTotal metrics.CounterVec
+ requestsLimitTotalAggregate metrics.AggregateCounterVec
+ requestsServiceUnavailableTotal metrics.CounterVec
+ requestsServiceUnavailableTotalAggregate metrics.AggregateCounterVec
+ requestsBusinessFailedTotal metrics.CounterVec
+ requestsBusinessFailedTotalAggregate metrics.AggregateCounterVec
+ requestsUnknownFailedTotal metrics.CounterVec
+ requestsUnknownFailedTotalAggregate metrics.AggregateCounterVec
}
// buildMetricSet will call init functions to initialize the metricSet
@@ -84,6 +96,18 @@ func (pm *providerMetrics) init(registry
metrics.MetricRegistry) {
metrics.NewMetricKey("dubbo_provider_rt_milliseconds_p95", "The
total response time spent by providers processing 95% of requests"),
metrics.NewMetricKey("dubbo_provider_rt_milliseconds_p99", "The
total response time spent by providers processing 99% of requests"),
}, []float64{0.5, 0.9, 0.95, 0.99})
+
+ // Granular error metrics
+ pm.requestsTimeoutTotal = metrics.NewCounterVec(registry,
metrics.NewMetricKey("dubbo_provider_requests_timeout_total", "Total Timeout
Failed Requests"))
+ pm.requestsTimeoutTotalAggregate =
metrics.NewAggregateCounterVec(registry,
metrics.NewMetricKey("dubbo_provider_requests_timeout_total_aggregate", "Total
Timeout Failed Requests under the sliding window"))
+ pm.requestsLimitTotal = metrics.NewCounterVec(registry,
metrics.NewMetricKey("dubbo_provider_requests_limit_total", "Total Limit Failed
Requests"))
+ pm.requestsLimitTotalAggregate =
metrics.NewAggregateCounterVec(registry,
metrics.NewMetricKey("dubbo_provider_requests_limit_total_aggregate", "Total
Limit Failed Requests under the sliding window"))
+ pm.requestsServiceUnavailableTotal = metrics.NewCounterVec(registry,
metrics.NewMetricKey("dubbo_provider_requests_failed_service_unavailable_total",
"Total Service Unavailable Failed Requests"))
+ pm.requestsServiceUnavailableTotalAggregate =
metrics.NewAggregateCounterVec(registry,
metrics.NewMetricKey("dubbo_provider_requests_failed_service_unavailable_total_aggregate",
"Total Service Unavailable Failed Requests under the sliding window"))
+ pm.requestsBusinessFailedTotal = metrics.NewCounterVec(registry,
metrics.NewMetricKey("dubbo_provider_requests_business_failed_total", "Total
Failed Business Requests"))
+ pm.requestsBusinessFailedTotalAggregate =
metrics.NewAggregateCounterVec(registry,
metrics.NewMetricKey("dubbo_provider_requests_business_failed_total_aggregate",
"Total Failed Business Requests under the sliding window"))
+ pm.requestsUnknownFailedTotal = metrics.NewCounterVec(registry,
metrics.NewMetricKey("dubbo_provider_requests_unknown_failed_total", "Total
Unknown Failed Requests"))
+ pm.requestsUnknownFailedTotalAggregate =
metrics.NewAggregateCounterVec(registry,
metrics.NewMetricKey("dubbo_provider_requests_unknown_failed_total_aggregate",
"Total Unknown Failed Requests under the sliding window"))
}
func (cm *consumerMetrics) init(registry metrics.MetricRegistry) {
@@ -109,4 +133,16 @@ func (cm *consumerMetrics) init(registry
metrics.MetricRegistry) {
metrics.NewMetricKey("dubbo_consumer_rt_milliseconds_p95", "The
total response time spent by consumers processing 95% of requests"),
metrics.NewMetricKey("dubbo_consumer_rt_milliseconds_p99", "The
total response time spent by consumers processing 99% of requests"),
}, []float64{0.5, 0.9, 0.95, 0.99})
+
+ // Granular error metrics
+ cm.requestsTimeoutTotal = metrics.NewCounterVec(registry,
metrics.NewMetricKey("dubbo_consumer_requests_timeout_total", "Total Timeout
Failed Requests"))
+ cm.requestsTimeoutTotalAggregate =
metrics.NewAggregateCounterVec(registry,
metrics.NewMetricKey("dubbo_consumer_requests_timeout_total_aggregate", "Total
Timeout Failed Requests under the sliding window"))
+ cm.requestsLimitTotal = metrics.NewCounterVec(registry,
metrics.NewMetricKey("dubbo_consumer_requests_limit_total", "Total Limit Failed
Requests"))
+ cm.requestsLimitTotalAggregate =
metrics.NewAggregateCounterVec(registry,
metrics.NewMetricKey("dubbo_consumer_requests_limit_total_aggregate", "Total
Limit Failed Requests under the sliding window"))
+ cm.requestsServiceUnavailableTotal = metrics.NewCounterVec(registry,
metrics.NewMetricKey("dubbo_consumer_requests_failed_service_unavailable_total",
"Total Service Unavailable Failed Requests"))
+ cm.requestsServiceUnavailableTotalAggregate =
metrics.NewAggregateCounterVec(registry,
metrics.NewMetricKey("dubbo_consumer_requests_failed_service_unavailable_total_aggregate",
"Total Service Unavailable Failed Requests under the sliding window"))
+ cm.requestsBusinessFailedTotal = metrics.NewCounterVec(registry,
metrics.NewMetricKey("dubbo_consumer_requests_business_failed_total", "Total
Failed Business Requests"))
+ cm.requestsBusinessFailedTotalAggregate =
metrics.NewAggregateCounterVec(registry,
metrics.NewMetricKey("dubbo_consumer_requests_business_failed_total_aggregate",
"Total Failed Business Requests under the sliding window"))
+ cm.requestsUnknownFailedTotal = metrics.NewCounterVec(registry,
metrics.NewMetricKey("dubbo_consumer_requests_unknown_failed_total", "Total
Unknown Failed Requests"))
+ cm.requestsUnknownFailedTotalAggregate =
metrics.NewAggregateCounterVec(registry,
metrics.NewMetricKey("dubbo_consumer_requests_unknown_failed_total_aggregate",
"Total Unknown Failed Requests under the sliding window"))
}