This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9a271ae4454 [fix][fn] check user metric len before iterating (#20021)
9a271ae4454 is described below

commit 9a271ae445471d8ba516abe53747eca0ae617b69
Author: Andy Walker <[email protected]>
AuthorDate: Wed Apr 12 20:28:42 2023 -0400

    [fix][fn] check user metric len before iterating (#20021)
    
    Co-authored-by: Andy Walker <[email protected]>
    (cherry picked from commit 52e8144587548a692e550a8538f4d2667b5499d6)
---
 pulsar-function-go/pf/instance.go                  |  3 ++
 .../pf/instanceControlServicer_test.go             | 53 ++++++++++++++++++++++
 pulsar-function-go/pf/stats_test.go                | 27 +++++++++++
 3 files changed, 83 insertions(+)

diff --git a/pulsar-function-go/pf/instance.go 
b/pulsar-function-go/pf/instance.go
index 5d17cfe0c33..a82273031ec 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -669,6 +669,9 @@ func (gi *goInstance) getTotalReceived1min() float32 {
 func (gi *goInstance) getUserMetricsMap() map[string]float64 {
        userMetricMap := map[string]float64{}
        filteredMetricFamilies := 
gi.getFilteredMetricFamilies(PulsarFunctionMetricsPrefix + UserMetric)
+       if len(filteredMetricFamilies) == 0 {
+               return userMetricMap
+       }
        for _, m := range filteredMetricFamilies[0].GetMetric() {
                var isFuncMetric bool
                var userLabelName string
diff --git a/pulsar-function-go/pf/instanceControlServicer_test.go 
b/pulsar-function-go/pf/instanceControlServicer_test.go
index 836ec6e5c79..9344d0a5915 100644
--- a/pulsar-function-go/pf/instanceControlServicer_test.go
+++ b/pulsar-function-go/pf/instanceControlServicer_test.go
@@ -21,6 +21,7 @@ package pf
 
 import (
        "context"
+       "fmt"
        "log"
        "net"
        "testing"
@@ -76,3 +77,55 @@ func 
TestInstanceControlServicer_serve_creates_valid_instance(t *testing.T) {
        log.Printf("Response: %+v", resp.Success)
        assert.Equal(t, resp.Success, true)
 }
+
+func instanceCommunicationClient(t *testing.T, instance *goInstance) 
pb.InstanceControlClient {
+       t.Helper()
+
+       if instance == nil {
+               t.Fatalf("cannot create communication client for nil instance")
+       }
+
+       var (
+               ctx context.Context = context.Background()
+               cf  context.CancelFunc
+       )
+
+       if testDeadline, ok := t.Deadline(); ok {
+               ctx, cf = context.WithDeadline(context.Background(), 
testDeadline)
+               t.Cleanup(cf)
+       }
+
+       lis = bufconn.Listen(bufSize)
+       t.Cleanup(func() {
+               lis.Close()
+       })
+       // create a gRPC server object
+       grpcServer := grpc.NewServer()
+       t.Cleanup(func() {
+               grpcServer.Stop()
+       })
+
+       servicer := InstanceControlServicer{instance}
+       // must register before we start the service.
+       pb.RegisterInstanceControlServer(grpcServer, &servicer)
+
+       // start the server
+       t.Logf("Serving InstanceCommunication on port %d", 
instance.context.GetPort())
+
+       go func() {
+               if err := grpcServer.Serve(lis); err != nil {
+                       panic(fmt.Sprintf("grpc server exited with error: %v", 
err))
+               }
+       }()
+
+       // Now we can setup the client:
+       conn, err := grpc.DialContext(ctx, "bufnet", 
grpc.WithContextDialer(getBufDialer(lis)), grpc.WithInsecure())
+       if err != nil {
+               t.Fatalf("Failed to dial bufnet: %v", err)
+       }
+       t.Cleanup(func() {
+               conn.Close()
+       })
+       client := pb.NewInstanceControlClient(conn)
+       return client
+}
diff --git a/pulsar-function-go/pf/stats_test.go 
b/pulsar-function-go/pf/stats_test.go
index d52b08b7173..7b415ef5eff 100644
--- a/pulsar-function-go/pf/stats_test.go
+++ b/pulsar-function-go/pf/stats_test.go
@@ -20,6 +20,7 @@
 package pf
 
 import (
+       "context"
        "fmt"
        "io/ioutil"
        "math"
@@ -28,6 +29,7 @@ import (
        "time"
 
        "github.com/golang/protobuf/proto"
+       "github.com/golang/protobuf/ptypes/empty"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/stretchr/testify/assert"
 
@@ -257,3 +259,28 @@ func TestUserMetrics(t *testing.T) {
        gi.close()
        metricsServicer.close()
 }
+
+func TestInstanceControlMetrics(t *testing.T) {
+       instance := newGoInstance()
+       t.Cleanup(instance.close)
+       instanceClient := instanceCommunicationClient(t, instance)
+       _, err := instanceClient.GetMetrics(context.Background(), 
&empty.Empty{})
+       assert.NoError(t, err, "err communicating with instance control: %v", 
err)
+
+       testLabels := []string{"userMetricControlTest1", 
"userMetricControlTest2"}
+       for _, label := range testLabels {
+               assert.NotContainsf(t, label, "user metrics should not yet 
contain %s", label)
+       }
+
+       for value, label := range testLabels {
+               instance.context.RecordMetric(label, float64(value+1))
+       }
+       time.Sleep(time.Second)
+
+       metrics, err := instanceClient.GetMetrics(context.Background(), 
&empty.Empty{})
+       assert.NoError(t, err, "err communicating with instance control: %v", 
err)
+       for value, label := range testLabels {
+               assert.Containsf(t, metrics.UserMetrics, label, "user metrics 
should contain metric %s", label)
+               assert.EqualValuesf(t, value+1, metrics.UserMetrics[label], 
"user metric %s != %d", label, value+1)
+       }
+}

Reply via email to