This is an automated email from the ASF dual-hosted git repository.

rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e6cf666  [Issue 9772][Go Functions]Allow User Metrics (#12013)
e6cf666 is described below

commit e6cf6660ca0d3e18270e5c638d49611ca3e8c453
Author: Andy Walker <[email protected]>
AuthorDate: Wed Feb 23 19:09:09 2022 -0500

    [Issue 9772][Go Functions]Allow User Metrics (#12013)
    
    * Support user-provided metrics
    
    * add docs
    
    * fix typo
    
    * fix function example
    
    * Address CR requests
    
    - Moved RecordMetric into context
    - Intialized user summaryvec with objectives to match Java impl
    - provided getMetrics() results for pulsar-admin/pulsarctl
    
    * remove spurious convert
    
    * tests for user metrics
    
    * Merge tests with subtests to avoid multiple metric
    server spin-ups and mark with nolint.
    
    * Revert "Merge tests with subtests to avoid multiple metric"
    
    This reverts commit 64d888c57251b69abd21e28d2eafd628fedf35f8.
    
    * Keep metricsServicer.close() from ruining tests
    - this will help in testing the fix for #12314 as well
---
 .../examples/userMetricsFunc/userMetricsFunc.go    | 44 +++++++++++++++++++
 pulsar-function-go/pf/context.go                   | 20 +++++++++
 pulsar-function-go/pf/instance.go                  | 51 ++++++++++++++++++++--
 pulsar-function-go/pf/stats.go                     | 21 ++++++++-
 pulsar-function-go/pf/stats_test.go                | 43 ++++++++++++++++++
 site2/docs/functions-develop.md                    | 17 +++++++-
 6 files changed, 191 insertions(+), 5 deletions(-)

diff --git a/pulsar-function-go/examples/userMetricsFunc/userMetricsFunc.go 
b/pulsar-function-go/examples/userMetricsFunc/userMetricsFunc.go
new file mode 100644
index 0000000..17764d2
--- /dev/null
+++ b/pulsar-function-go/examples/userMetricsFunc/userMetricsFunc.go
@@ -0,0 +1,44 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+       "context"
+       "errors"
+
+       "github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+func metricRecorderFunction(ctx context.Context, in []byte) error {
+       inputstr := string(in)
+       fctx, ok := pf.FromContext(ctx)
+       if !ok {
+               return errors.New("get Go Functions Context error")
+       }
+       fctx.RecordMetric("hit-count", 1)
+       if inputstr == "eleven" {
+               fctx.RecordMetric("elevens-count", 1)
+       }
+       return nil
+}
+
+func main() {
+       pf.Start(metricRecorderFunction)
+}
diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go
index 599b2eb..fd3da9b 100644
--- a/pulsar-function-go/pf/context.go
+++ b/pulsar-function-go/pf/context.go
@@ -22,9 +22,11 @@ package pf
 import (
        "context"
        "encoding/json"
+       "sync"
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 // FunctionContext provides contextual information to the executing function.
@@ -36,6 +38,7 @@ type FunctionContext struct {
        userConfigs   map[string]interface{}
        logAppender   *LogAppender
        outputMessage func(topic string) pulsar.Producer
+       userMetrics   sync.Map
        record        pulsar.Message
 }
 
@@ -174,6 +177,23 @@ func (c *FunctionContext) GetMetricsPort() int {
        return c.instanceConf.metricsPort
 }
 
+//RecordMetric records an observation to the user_metric summary with the 
provided value
+func (c *FunctionContext) RecordMetric(metricName string, metricValue float64) 
{
+       v, ok := c.userMetrics.Load(metricName)
+       if !ok {
+               v, _ = c.userMetrics.LoadOrStore(metricName, 
userMetricSummary.WithLabelValues(
+                       c.GetFuncTenant(),
+                       c.GetTenantAndNamespace(),
+                       c.GetFuncName(),
+                       c.GetFuncID(),
+                       c.GetClusterName(),
+                       c.GetTenantAndNamespaceAndName(),
+                       metricName,
+               ))
+       }
+       v.(prometheus.Observer).Observe(metricValue)
+}
+
 // An unexported type to be used as the key for types in this package. This
 // prevents collisions with keys defined in other packages.
 type key struct{}
diff --git a/pulsar-function-go/pf/instance.go 
b/pulsar-function-go/pf/instance.go
index def254b..b03a79b 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -507,6 +507,7 @@ func (gi *goInstance) getMetrics() *pb.MetricsData {
        totalUserExceptions1min := gi.getTotalUserExceptions1min()
        totalSysExceptions1min := gi.getTotalSysExceptions1min()
        //avg_process_latency_ms_1min := gi.get_avg_process_latency_1min()
+       userMetricsMap := gi.getUserMetricsMap()
 
        metricsData := pb.MetricsData{}
        // total metrics
@@ -522,6 +523,9 @@ func (gi *goInstance) getMetrics() *pb.MetricsData {
        metricsData.SystemExceptionsTotal_1Min = int64(totalSysExceptions1min)
        metricsData.UserExceptionsTotal_1Min = int64(totalUserExceptions1min)
 
+       // user metrics
+       metricsData.UserMetrics = userMetricsMap
+
        return &metricsData
 }
 
@@ -546,6 +550,13 @@ func (gi *goInstance) getMatchingMetricFunc() func(lbl 
*prometheus_client.LabelP
 }
 
 func (gi *goInstance) getMatchingMetricFromRegistry(metricName string) 
prometheus_client.Metric {
+       filteredMetricFamilies := gi.getFilteredMetricFamilies(metricName)
+       metricFunc := gi.getMatchingMetricFunc()
+       matchingMetric := getFirstMatch(filteredMetricFamilies[0].Metric, 
metricFunc)
+       return *matchingMetric
+}
+
+func (gi *goInstance) getFilteredMetricFamilies(metricName string) 
[]*prometheus_client.MetricFamily {
        metricFamilies, err := reg.Gather()
        if err != nil {
                log.Errorf("Something went wrong when calling reg.Gather(), the 
metricName is: %s", metricName)
@@ -558,9 +569,7 @@ func (gi *goInstance) 
getMatchingMetricFromRegistry(metricName string) prometheu
                // handle this.
                log.Errorf("Too many metric families for metricName: %s " + 
metricName)
        }
-       metricFunc := gi.getMatchingMetricFunc()
-       matchingMetric := getFirstMatch(filteredMetricFamilies[0].Metric, 
metricFunc)
-       return *matchingMetric
+       return filteredMetricFamilies
 }
 
 func (gi *goInstance) getTotalReceived() float32 {
@@ -636,3 +645,39 @@ func (gi *goInstance) getTotalReceived1min() float32 {
        val := metric.GetGauge().Value
        return float32(*val)
 }
+
+func (gi *goInstance) getUserMetricsMap() map[string]float64 {
+       userMetricMap := map[string]float64{}
+       filteredMetricFamilies := 
gi.getFilteredMetricFamilies(PulsarFunctionMetricsPrefix + UserMetric)
+       for _, m := range filteredMetricFamilies[0].GetMetric() {
+               var isFuncMetric bool
+               var userLabelName string
+       VERIFY_USER_METRIC:
+               for _, l := range m.GetLabel() {
+                       switch l.GetName() {
+                       case "fqfn":
+                               if l.GetValue() == 
gi.context.GetTenantAndNamespaceAndName() {
+                                       isFuncMetric = true
+                                       if userLabelName != "" {
+                                               break VERIFY_USER_METRIC
+                                       }
+                               }
+                       case "metric":
+                               userLabelName = l.GetValue()
+                               if isFuncMetric {
+                                       break VERIFY_USER_METRIC
+                               }
+                       }
+               }
+               if isFuncMetric && userLabelName != "" {
+                       summary := m.GetSummary()
+                       count := summary.GetSampleCount()
+                       if count <= 0 {
+                               userMetricMap[userLabelName] = 0
+                       } else {
+                               userMetricMap[userLabelName] = 
summary.GetSampleSum() / float64(count)
+                       }
+               }
+       }
+       return userMetricMap
+}
diff --git a/pulsar-function-go/pf/stats.go b/pulsar-function-go/pf/stats.go
index 5aea8b2..29c6aa0 100644
--- a/pulsar-function-go/pf/stats.go
+++ b/pulsar-function-go/pf/stats.go
@@ -35,6 +35,8 @@ var (
        metricsLabelNames          = []string{"tenant", "namespace", "name", 
"instance_id", "cluster", "fqfn"}
        exceptionLabelNames        = []string{"error"}
        exceptionMetricsLabelNames = append(metricsLabelNames, 
exceptionLabelNames...)
+       userLabelNames             = []string{"metric"}
+       userMetricLabelNames       = append(metricsLabelNames, 
userLabelNames...)
 )
 
 const (
@@ -52,6 +54,8 @@ const (
        TotalUserExceptions1min        = "user_exceptions_total_1min"
        ProcessLatencyMs1min           = "process_latency_ms_1min"
        TotalReceived1min              = "received_total_1min"
+
+       UserMetric = "user_metric"
 )
 
 // Declare Prometheus
@@ -122,6 +126,18 @@ var (
                prometheus.GaugeOpts{
                        Name: PulsarFunctionMetricsPrefix + "system_exception",
                        Help: "Exception from system code."}, 
exceptionMetricsLabelNames)
+
+       userMetricSummary = prometheus.NewSummaryVec(
+               prometheus.SummaryOpts{
+                       Name: PulsarFunctionMetricsPrefix + UserMetric,
+                       Help: "User defined metric.",
+                       Objectives: map[float64]float64{
+                               0.5:   0.01,
+                               0.9:   0.01,
+                               0.99:  0.01,
+                               0.999: 0.01,
+                       },
+               }, userMetricLabelNames)
 )
 
 type MetricsServicer struct {
@@ -146,6 +162,7 @@ func init() {
        reg.MustRegister(statTotalReceived1min)
        reg.MustRegister(userExceptions)
        reg.MustRegister(systemExceptions)
+       reg.MustRegister(userMetricSummary)
 
 }
 
@@ -339,7 +356,9 @@ func (s *MetricsServicer) serve() {
                // create a listener on metrics port
                log.Infof("Starting metrics server on port %d", 
s.goInstance.context.GetMetricsPort())
                err := s.server.ListenAndServe()
-               if err != nil {
+               switch err {
+               case nil, http.ErrServerClosed:
+               default:
                        log.Fatalf("failed to start metrics server: %v", err)
                }
        }()
diff --git a/pulsar-function-go/pf/stats_test.go 
b/pulsar-function-go/pf/stats_test.go
index 3e38d10..d52b08b 100644
--- a/pulsar-function-go/pf/stats_test.go
+++ b/pulsar-function-go/pf/stats_test.go
@@ -213,4 +213,47 @@ func TestMetricsServer(t *testing.T) {
        assert.Equal(t, nil, err)
        assert.NotEmpty(t, body)
        resp.Body.Close()
+       gi.close()
+       metricsServicer.close()
+}
+
+// nolint
+func TestUserMetrics(t *testing.T) {
+       gi := newGoInstance()
+       metricsServicer := NewMetricsServicer(gi)
+       metricsServicer.serve()
+
+       resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics";, 
gi.context.GetMetricsPort()))
+       assert.Equal(t, nil, err)
+       assert.NotEqual(t, nil, resp)
+       assert.Equal(t, 200, resp.StatusCode)
+       body, err := ioutil.ReadAll(resp.Body)
+       assert.Equal(t, nil, err)
+       assert.NotEmpty(t, body)
+       assert.NotContainsf(t, string(body), "pulsar_function_user_metric", 
"user metric should not appear yet")
+
+       testUserMetricValues := map[string]int{"test": 1, "test2": 2}
+
+       for labelname, value := range testUserMetricValues {
+               gi.context.RecordMetric(labelname, float64(value))
+       }
+
+       time.Sleep(time.Second * 1)
+       resp, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics";, 
gi.context.GetMetricsPort()))
+       assert.Equal(t, nil, err)
+       assert.NotEqual(t, nil, resp)
+       assert.Equal(t, 200, resp.StatusCode)
+       body, err = ioutil.ReadAll(resp.Body)
+       assert.Equal(t, nil, err)
+       assert.NotEmpty(t, body)
+
+       for labelname, value := range testUserMetricValues {
+               for _, quantile := range []string{"0.5", "0.9", "0.99", 
"0.999"} {
+                       assert.Containsf(t, string(body), 
fmt.Sprintf("\n"+`pulsar_function_user_metric{cluster="pulsar-function-go",fqfn="//go-function",instance_id="pulsar-function",metric="%s",name="go-function",namespace="/",tenant="",quantile="%s"}
 %d`+"\n", labelname, quantile, value), "user metric %q quantile %s not found 
with value %d", labelname, quantile, value)
+               }
+       }
+
+       resp.Body.Close()
+       gi.close()
+       metricsServicer.close()
 }
diff --git a/site2/docs/functions-develop.md b/site2/docs/functions-develop.md
index 2bff254..5853dee1 100644
--- a/site2/docs/functions-develop.md
+++ b/site2/docs/functions-develop.md
@@ -1054,7 +1054,22 @@ class MetricRecorderFunction(Function):
             context.record_metric('elevens-count', 1)
 ```
 <!--Go-->
-Currently, the feature is not available in Go.
+The Go SDK [`Context`](#context) object enables you to record metrics on a 
per-key basis. For example, you can set a metric for the `process-count` key 
and a different metric for the `elevens-count` key every time the function 
processes a message:
+
+```go
+func metricRecorderFunction(ctx context.Context, in []byte) error {
+       inputstr := string(in)
+       fctx, ok := pf.FromContext(ctx)
+       if !ok {
+               return errors.New("get Go Functions Context error")
+       }
+       fctx.RecordMetric("hit-count", 1)
+       if inputstr == "eleven" {
+               fctx.RecordMetric("elevens-count", 1)
+       }
+       return nil
+}
+```
 
 <!--END_DOCUSAURUS_CODE_TABS-->
 

Reply via email to