devinbost commented on a change in pull request #6105: [pulsar-function-go] Add 
statistics and Prometheus to Go Function instances for production readiness
URL: https://github.com/apache/pulsar/pull/6105#discussion_r406349549
 
 

 ##########
 File path: pulsar-function-go/pf/instance.go
 ##########
 @@ -374,17 +409,201 @@ func (gi *goInstance) healthCheck() 
*pb.HealthCheckResult {
 }
 
 func (gi *goInstance) getFunctionStatus() *pb.FunctionStatus {
-       return nil // Not implemented until we add the statistics features
+       status := pb.FunctionStatus{}
+       status.Running = true
+       totalReceived := gi.getTotalReceived()
+       totalProcessedSuccessfully := gi.getTotalProcessedSuccessfully()
+       totalUserExceptions := gi.getTotalUserExceptions()
+       totalSysExceptions := gi.getTotalSysExceptions()
+       avgProcessLatencyMs := gi.getAvgProcessLatency()
+       lastInvocation := gi.getLastInvocation()
+
+       status.NumReceived = int64(totalReceived)
+       status.NumSuccessfullyProcessed = int64(totalProcessedSuccessfully)
+       status.NumUserExceptions = int64(totalUserExceptions)
+       status.InstanceId = strconv.Itoa(gi.context.instanceConf.instanceID)
+
+       status.NumUserExceptions = int64(totalUserExceptions)
+       for _, exPair := range gi.stats.latestUserException {
+               toAdd := pb.FunctionStatus_ExceptionInformation{}
+               toAdd.ExceptionString = exPair.exception.Error()
+               toAdd.MsSinceEpoch = exPair.timestamp
+               status.LatestUserExceptions = 
append(status.LatestUserExceptions, &toAdd)
+       }
+
+       status.NumSystemExceptions = int64(totalSysExceptions)
+       for _, exPair := range gi.stats.latestSysException {
+               toAdd := pb.FunctionStatus_ExceptionInformation{}
+               toAdd.ExceptionString = exPair.exception.Error()
+               toAdd.MsSinceEpoch = exPair.timestamp
+               status.LatestSystemExceptions = 
append(status.LatestSystemExceptions, &toAdd)
+       }
+       status.AverageLatency = float64(avgProcessLatencyMs)
+       status.LastInvocationTime = int64(lastInvocation)
+       return &status
+}
+
+func (gi *goInstance) getMetrics() *pb.MetricsData {
+       totalReceived := gi.getTotalReceived()
+       totalProcessedSuccessfully := gi.getTotalProcessedSuccessfully()
+       totalUserExceptions := gi.getTotalUserExceptions()
+       totalSysExceptions := gi.getTotalSysExceptions()
+       avgProcessLatencyMs := gi.getAvgProcessLatency()
+       lastInvocation := gi.getLastInvocation()
+
+       totalReceived1min := gi.getTotalReceived1min()
+       totalProcessedSuccessfully1min := gi.getTotalProcessedSuccessfully1min()
+       totalUserExceptions1min := gi.getTotalUserExceptions1min()
+       totalSysExceptions1min := gi.getTotalSysExceptions1min()
+       //avg_process_latency_ms_1min := gi.get_avg_process_latency_1min()
+
+       metricsData := pb.MetricsData{}
+       // total metrics
+       metricsData.ReceivedTotal = int64(totalReceived)
+       metricsData.ProcessedSuccessfullyTotal = 
int64(totalProcessedSuccessfully)
+       metricsData.SystemExceptionsTotal = int64(totalSysExceptions)
+       metricsData.UserExceptionsTotal = int64(totalUserExceptions)
+       metricsData.AvgProcessLatency = float64(avgProcessLatencyMs)
+       metricsData.LastInvocation = int64(lastInvocation)
+       // 1min metrics
+       metricsData.ReceivedTotal_1Min = int64(totalReceived1min)
+       metricsData.ProcessedSuccessfullyTotal_1Min = 
int64(totalProcessedSuccessfully1min)
+       metricsData.SystemExceptionsTotal_1Min = int64(totalSysExceptions1min)
+       metricsData.UserExceptionsTotal_1Min = int64(totalUserExceptions1min)
+       //metrics_data.AvgProcessLatency_1Min = avg_process_latency_ms_1min
+
+       // get any user metrics
+       // Not sure yet where these are stored.
+       /*
+          user_metrics := self.contextimpl.get_metrics()
+          for metric_name, value in user_metrics.items():
+            metrics_data.userMetrics[metric_name] = value
+       */
+
+       return &metricsData
 }
 
 func (gi *goInstance) getAndResetMetrics() *pb.MetricsData {
-       return nil // Not implemented until we add the statistics features
+       metricsData := gi.getMetrics()
+       gi.resetMetrics()
+       return metricsData
 }
 
 func (gi *goInstance) resetMetrics() *empty.Empty {
-       return nil // Not implemented until we add the statistics features
+       gi.stats.reset()
+       return &empty.Empty{}
 }
 
-func (gi *goInstance) getMetrics() *pb.MetricsData {
-       return nil // Not implemented until we add the statistics features
+// This method is used to get the required metrics for Prometheus.
+// Note that this doesn't distinguish between parallel function instances!
+func (gi *goInstance) getMatchingMetricFunc() func(lbl 
*io_prometheus_client.LabelPair) bool {
+       matchMetricFunc := func(lbl *io_prometheus_client.LabelPair) bool {
+               return *lbl.Name == "fqfn" && *lbl.Value == 
gi.context.GetTenantAndNamespaceAndName()
+       }
+       return matchMetricFunc
+}
+
+// e.g. metricName = "pulsar_function_process_latency_ms"
+func (gi *goInstance) getMatchingMetricFromRegistry(metricName string) 
io_prometheus_client.Metric {
+       metricFamilies, err := reg.Gather()
+       if err != nil {
+               log.Error("Something went wrong when calling reg.Gather() in 
getMatchingMetricFromRegistry(..) for " + metricName)
+       }
+       matchFamilyFunc := func(vect *io_prometheus_client.MetricFamily) bool {
+               return *vect.Name == metricName
+       }
+       fiteredMetricFamilies := filter(metricFamilies, matchFamilyFunc)
+       if len(fiteredMetricFamilies) > 1 {
+               // handle this.
+               log.Error("Too many metric families for metricName = " + 
metricName)
+               // Should we panic here instead of report an error since it 
reflects a code problem, not a user problem?
+       }
+       metricFunc := gi.getMatchingMetricFunc()
+       matchingMetric := getFirstMatch(fiteredMetricFamilies[0].Metric, 
metricFunc)
+       return *matchingMetric
+}
+
+func (gi *goInstance) getTotalReceived() float32 {
+       // "pulsar_function_" + "received_total", NewGaugeVec.
+       metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix 
+ TotalReceived)
+       val := metric.GetGauge().Value
+       return float32(*val)
+}
+func (gi *goInstance) getTotalProcessedSuccessfully() float32 {
+       metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix 
+ TotalSuccessfullyProcessed)
+       // "pulsar_function_" + "processed_successfully_total", NewGaugeVec.
+       val := metric.GetGauge().Value
+       return float32(*val)
+}
+func (gi *goInstance) getTotalSysExceptions() float32 {
+       metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix 
+ TotalSystemExceptions)
+       // "pulsar_function_"+ "system_exceptions_total", NewGaugeVec.
+       val := metric.GetGauge().Value
+       return float32(*val)
+}
+
+func (gi *goInstance) getTotalUserExceptions() float32 {
+       metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix 
+ TotalUserExceptions)
+       // "pulsar_function_" + "user_exceptions_total", NewGaugeVec
+       val := metric.GetGauge().Value
+       return float32(*val)
+}
+
+func (gi *goInstance) getAvgProcessLatency() float32 {
+       metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix 
+ ProcessLatencyMs)
+       // "pulsar_function_" + "process_latency_ms", SummaryVec.
+       count := metric.GetSummary().SampleCount
+       sum := metric.GetSummary().SampleSum
+       if *count <= 0.0 {
+               return 0.0
+       }
+       return float32(*sum) / float32(*count)
+}
+
+func (gi *goInstance) getLastInvocation() float32 {
+       metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix 
+ LastInvocation)
+       // "pulsar_function_" + "last_invocation", GaugeVec.
+       val := metric.GetGauge().Value
+       return float32(*val)
+}
+
+func (gi *goInstance) getTotalProcessedSuccessfully1min() float32 {
+       metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix 
+ TotalSuccessfullyProcessed1min)
+       // "pulsar_function_" + "processed_successfully_total_1min", GaugeVec.
+       val := metric.GetGauge().Value
+       return float32(*val)
+}
+
+func (gi *goInstance) getTotalSysExceptions1min() float32 {
+       metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix 
+ TotalSystemExceptions1min)
+       // "pulsar_function_" + "system_exceptions_total_1min", GaugeVec
+       val := metric.GetGauge().Value
+       return float32(*val)
+}
+
+func (gi *goInstance) getTotalUserExceptions1min() float32 {
+       metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix 
+ TotalUserExceptions1min)
+       // "pulsar_function_" + "user_exceptions_total_1min", GaugeVec
+       val := metric.GetGauge().Value
+       return float32(*val)
+}
+
+/*
+func (gi *goInstance) get_avg_process_latency_1min() float32 {
 
 Review comment:
   @jiazhai This is a method that isn't available due to the Prometheus 
architecture change they rolled out to their Go library. So, we need to find 
another way to capture the 1 min metrics. The comment was to note that this 
feature is currently missing. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to