This is an automated email from the ASF dual-hosted git repository.
rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e6cf666 [Issue 9772][Go Functions]Allow User Metrics (#12013)
e6cf666 is described below
commit e6cf6660ca0d3e18270e5c638d49611ca3e8c453
Author: Andy Walker <[email protected]>
AuthorDate: Wed Feb 23 19:09:09 2022 -0500
[Issue 9772][Go Functions]Allow User Metrics (#12013)
* Support user-provided metrics
* add docs
* fix typo
* fix function example
* Address CR requests
- Moved RecordMetric into context
- Intialized user summaryvec with objectives to match Java impl
- provided getMetrics() results for pulsar-admin/pulsarctl
* remove spurious convert
* tests for user metrics
* Merge tests with subtests to avoid multiple metric
server spin-ups and mark with nolint.
* Revert "Merge tests with subtests to avoid multiple metric"
This reverts commit 64d888c57251b69abd21e28d2eafd628fedf35f8.
* Keep metricsServicer.close() from ruining tests
- this will help in testing the fix for #12314 as well
---
.../examples/userMetricsFunc/userMetricsFunc.go | 44 +++++++++++++++++++
pulsar-function-go/pf/context.go | 20 +++++++++
pulsar-function-go/pf/instance.go | 51 ++++++++++++++++++++--
pulsar-function-go/pf/stats.go | 21 ++++++++-
pulsar-function-go/pf/stats_test.go | 43 ++++++++++++++++++
site2/docs/functions-develop.md | 17 +++++++-
6 files changed, 191 insertions(+), 5 deletions(-)
diff --git a/pulsar-function-go/examples/userMetricsFunc/userMetricsFunc.go
b/pulsar-function-go/examples/userMetricsFunc/userMetricsFunc.go
new file mode 100644
index 0000000..17764d2
--- /dev/null
+++ b/pulsar-function-go/examples/userMetricsFunc/userMetricsFunc.go
@@ -0,0 +1,44 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+ "context"
+ "errors"
+
+ "github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+func metricRecorderFunction(ctx context.Context, in []byte) error {
+ inputstr := string(in)
+ fctx, ok := pf.FromContext(ctx)
+ if !ok {
+ return errors.New("get Go Functions Context error")
+ }
+ fctx.RecordMetric("hit-count", 1)
+ if inputstr == "eleven" {
+ fctx.RecordMetric("elevens-count", 1)
+ }
+ return nil
+}
+
+func main() {
+ pf.Start(metricRecorderFunction)
+}
diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go
index 599b2eb..fd3da9b 100644
--- a/pulsar-function-go/pf/context.go
+++ b/pulsar-function-go/pf/context.go
@@ -22,9 +22,11 @@ package pf
import (
"context"
"encoding/json"
+ "sync"
"time"
"github.com/apache/pulsar-client-go/pulsar"
+ "github.com/prometheus/client_golang/prometheus"
)
// FunctionContext provides contextual information to the executing function.
@@ -36,6 +38,7 @@ type FunctionContext struct {
userConfigs map[string]interface{}
logAppender *LogAppender
outputMessage func(topic string) pulsar.Producer
+ userMetrics sync.Map
record pulsar.Message
}
@@ -174,6 +177,23 @@ func (c *FunctionContext) GetMetricsPort() int {
return c.instanceConf.metricsPort
}
+//RecordMetric records an observation to the user_metric summary with the
provided value
+func (c *FunctionContext) RecordMetric(metricName string, metricValue float64)
{
+ v, ok := c.userMetrics.Load(metricName)
+ if !ok {
+ v, _ = c.userMetrics.LoadOrStore(metricName,
userMetricSummary.WithLabelValues(
+ c.GetFuncTenant(),
+ c.GetTenantAndNamespace(),
+ c.GetFuncName(),
+ c.GetFuncID(),
+ c.GetClusterName(),
+ c.GetTenantAndNamespaceAndName(),
+ metricName,
+ ))
+ }
+ v.(prometheus.Observer).Observe(metricValue)
+}
+
// An unexported type to be used as the key for types in this package. This
// prevents collisions with keys defined in other packages.
type key struct{}
diff --git a/pulsar-function-go/pf/instance.go
b/pulsar-function-go/pf/instance.go
index def254b..b03a79b 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -507,6 +507,7 @@ func (gi *goInstance) getMetrics() *pb.MetricsData {
totalUserExceptions1min := gi.getTotalUserExceptions1min()
totalSysExceptions1min := gi.getTotalSysExceptions1min()
//avg_process_latency_ms_1min := gi.get_avg_process_latency_1min()
+ userMetricsMap := gi.getUserMetricsMap()
metricsData := pb.MetricsData{}
// total metrics
@@ -522,6 +523,9 @@ func (gi *goInstance) getMetrics() *pb.MetricsData {
metricsData.SystemExceptionsTotal_1Min = int64(totalSysExceptions1min)
metricsData.UserExceptionsTotal_1Min = int64(totalUserExceptions1min)
+ // user metrics
+ metricsData.UserMetrics = userMetricsMap
+
return &metricsData
}
@@ -546,6 +550,13 @@ func (gi *goInstance) getMatchingMetricFunc() func(lbl
*prometheus_client.LabelP
}
func (gi *goInstance) getMatchingMetricFromRegistry(metricName string)
prometheus_client.Metric {
+ filteredMetricFamilies := gi.getFilteredMetricFamilies(metricName)
+ metricFunc := gi.getMatchingMetricFunc()
+ matchingMetric := getFirstMatch(filteredMetricFamilies[0].Metric,
metricFunc)
+ return *matchingMetric
+}
+
+func (gi *goInstance) getFilteredMetricFamilies(metricName string)
[]*prometheus_client.MetricFamily {
metricFamilies, err := reg.Gather()
if err != nil {
log.Errorf("Something went wrong when calling reg.Gather(), the
metricName is: %s", metricName)
@@ -558,9 +569,7 @@ func (gi *goInstance)
getMatchingMetricFromRegistry(metricName string) prometheu
// handle this.
log.Errorf("Too many metric families for metricName: %s " +
metricName)
}
- metricFunc := gi.getMatchingMetricFunc()
- matchingMetric := getFirstMatch(filteredMetricFamilies[0].Metric,
metricFunc)
- return *matchingMetric
+ return filteredMetricFamilies
}
func (gi *goInstance) getTotalReceived() float32 {
@@ -636,3 +645,39 @@ func (gi *goInstance) getTotalReceived1min() float32 {
val := metric.GetGauge().Value
return float32(*val)
}
+
+func (gi *goInstance) getUserMetricsMap() map[string]float64 {
+ userMetricMap := map[string]float64{}
+ filteredMetricFamilies :=
gi.getFilteredMetricFamilies(PulsarFunctionMetricsPrefix + UserMetric)
+ for _, m := range filteredMetricFamilies[0].GetMetric() {
+ var isFuncMetric bool
+ var userLabelName string
+ VERIFY_USER_METRIC:
+ for _, l := range m.GetLabel() {
+ switch l.GetName() {
+ case "fqfn":
+ if l.GetValue() ==
gi.context.GetTenantAndNamespaceAndName() {
+ isFuncMetric = true
+ if userLabelName != "" {
+ break VERIFY_USER_METRIC
+ }
+ }
+ case "metric":
+ userLabelName = l.GetValue()
+ if isFuncMetric {
+ break VERIFY_USER_METRIC
+ }
+ }
+ }
+ if isFuncMetric && userLabelName != "" {
+ summary := m.GetSummary()
+ count := summary.GetSampleCount()
+ if count <= 0 {
+ userMetricMap[userLabelName] = 0
+ } else {
+ userMetricMap[userLabelName] =
summary.GetSampleSum() / float64(count)
+ }
+ }
+ }
+ return userMetricMap
+}
diff --git a/pulsar-function-go/pf/stats.go b/pulsar-function-go/pf/stats.go
index 5aea8b2..29c6aa0 100644
--- a/pulsar-function-go/pf/stats.go
+++ b/pulsar-function-go/pf/stats.go
@@ -35,6 +35,8 @@ var (
metricsLabelNames = []string{"tenant", "namespace", "name",
"instance_id", "cluster", "fqfn"}
exceptionLabelNames = []string{"error"}
exceptionMetricsLabelNames = append(metricsLabelNames,
exceptionLabelNames...)
+ userLabelNames = []string{"metric"}
+ userMetricLabelNames = append(metricsLabelNames,
userLabelNames...)
)
const (
@@ -52,6 +54,8 @@ const (
TotalUserExceptions1min = "user_exceptions_total_1min"
ProcessLatencyMs1min = "process_latency_ms_1min"
TotalReceived1min = "received_total_1min"
+
+ UserMetric = "user_metric"
)
// Declare Prometheus
@@ -122,6 +126,18 @@ var (
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + "system_exception",
Help: "Exception from system code."},
exceptionMetricsLabelNames)
+
+ userMetricSummary = prometheus.NewSummaryVec(
+ prometheus.SummaryOpts{
+ Name: PulsarFunctionMetricsPrefix + UserMetric,
+ Help: "User defined metric.",
+ Objectives: map[float64]float64{
+ 0.5: 0.01,
+ 0.9: 0.01,
+ 0.99: 0.01,
+ 0.999: 0.01,
+ },
+ }, userMetricLabelNames)
)
type MetricsServicer struct {
@@ -146,6 +162,7 @@ func init() {
reg.MustRegister(statTotalReceived1min)
reg.MustRegister(userExceptions)
reg.MustRegister(systemExceptions)
+ reg.MustRegister(userMetricSummary)
}
@@ -339,7 +356,9 @@ func (s *MetricsServicer) serve() {
// create a listener on metrics port
log.Infof("Starting metrics server on port %d",
s.goInstance.context.GetMetricsPort())
err := s.server.ListenAndServe()
- if err != nil {
+ switch err {
+ case nil, http.ErrServerClosed:
+ default:
log.Fatalf("failed to start metrics server: %v", err)
}
}()
diff --git a/pulsar-function-go/pf/stats_test.go
b/pulsar-function-go/pf/stats_test.go
index 3e38d10..d52b08b 100644
--- a/pulsar-function-go/pf/stats_test.go
+++ b/pulsar-function-go/pf/stats_test.go
@@ -213,4 +213,47 @@ func TestMetricsServer(t *testing.T) {
assert.Equal(t, nil, err)
assert.NotEmpty(t, body)
resp.Body.Close()
+ gi.close()
+ metricsServicer.close()
+}
+
+// nolint
+func TestUserMetrics(t *testing.T) {
+ gi := newGoInstance()
+ metricsServicer := NewMetricsServicer(gi)
+ metricsServicer.serve()
+
+ resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics",
gi.context.GetMetricsPort()))
+ assert.Equal(t, nil, err)
+ assert.NotEqual(t, nil, resp)
+ assert.Equal(t, 200, resp.StatusCode)
+ body, err := ioutil.ReadAll(resp.Body)
+ assert.Equal(t, nil, err)
+ assert.NotEmpty(t, body)
+ assert.NotContainsf(t, string(body), "pulsar_function_user_metric",
"user metric should not appear yet")
+
+ testUserMetricValues := map[string]int{"test": 1, "test2": 2}
+
+ for labelname, value := range testUserMetricValues {
+ gi.context.RecordMetric(labelname, float64(value))
+ }
+
+ time.Sleep(time.Second * 1)
+ resp, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics",
gi.context.GetMetricsPort()))
+ assert.Equal(t, nil, err)
+ assert.NotEqual(t, nil, resp)
+ assert.Equal(t, 200, resp.StatusCode)
+ body, err = ioutil.ReadAll(resp.Body)
+ assert.Equal(t, nil, err)
+ assert.NotEmpty(t, body)
+
+ for labelname, value := range testUserMetricValues {
+ for _, quantile := range []string{"0.5", "0.9", "0.99",
"0.999"} {
+ assert.Containsf(t, string(body),
fmt.Sprintf("\n"+`pulsar_function_user_metric{cluster="pulsar-function-go",fqfn="//go-function",instance_id="pulsar-function",metric="%s",name="go-function",namespace="/",tenant="",quantile="%s"}
%d`+"\n", labelname, quantile, value), "user metric %q quantile %s not found
with value %d", labelname, quantile, value)
+ }
+ }
+
+ resp.Body.Close()
+ gi.close()
+ metricsServicer.close()
}
diff --git a/site2/docs/functions-develop.md b/site2/docs/functions-develop.md
index 2bff254..5853dee1 100644
--- a/site2/docs/functions-develop.md
+++ b/site2/docs/functions-develop.md
@@ -1054,7 +1054,22 @@ class MetricRecorderFunction(Function):
context.record_metric('elevens-count', 1)
```
<!--Go-->
-Currently, the feature is not available in Go.
+The Go SDK [`Context`](#context) object enables you to record metrics on a
per-key basis. For example, you can set a metric for the `process-count` key
and a different metric for the `elevens-count` key every time the function
processes a message:
+
+```go
+func metricRecorderFunction(ctx context.Context, in []byte) error {
+ inputstr := string(in)
+ fctx, ok := pf.FromContext(ctx)
+ if !ok {
+ return errors.New("get Go Functions Context error")
+ }
+ fctx.RecordMetric("hit-count", 1)
+ if inputstr == "eleven" {
+ fctx.RecordMetric("elevens-count", 1)
+ }
+ return nil
+}
+```
<!--END_DOCUSAURUS_CODE_TABS-->