sijie closed pull request #3107: report function exceptions via prometheus
URL: https://github.com/apache/pulsar/pull/3107
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index fa35ca14f2..f4ebdccb0a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -407,68 +407,79 @@ public void testPulsarFunctionStats() throws Exception {
         Metric m = metrics.get("pulsar_function_received_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_received_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_user_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_user_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_process_latency_ms");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, Double.NaN);
         m = metrics.get("pulsar_function_process_latency_ms_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, Double.NaN);
         m = metrics.get("pulsar_function_system_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_system_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_last_invocation");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_processed_successfully_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_processed_successfully_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, 0.0);
 
 
@@ -554,68 +565,79 @@ public void testPulsarFunctionStats() throws Exception {
         m = metrics.get("pulsar_function_received_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_function_received_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_function_user_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_user_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_process_latency_ms");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_function_process_latency_ms_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_function_system_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_system_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_last_invocation");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_function_processed_successfully_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_function_processed_successfully_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("function"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, (double) totalMsgs);
     }
 
diff --git a/pulsar-client-cpp/python/setup.py 
b/pulsar-client-cpp/python/setup.py
index 952c57b684..ce8d2594cf 100644
--- a/pulsar-client-cpp/python/setup.py
+++ b/pulsar-client-cpp/python/setup.py
@@ -70,6 +70,9 @@ def build_extension(self, ext):
     license="Apache License v2.0",
     url="http://pulsar.apache.org/";,
     install_requires=[
-        'grpcio', 'protobuf', "prometheus_client"
+        'grpcio', 'protobuf',
+        # functions dependencies
+        "prometheus_client",
+        "ratelimit"
     ],
 )
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index c1b7574199..f9a3c77c8f 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -27,10 +27,12 @@
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.RateLimiter;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Arrays;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -43,7 +45,13 @@
 @Setter
 public class FunctionStatsManager implements AutoCloseable {
 
-    static final String[] metricsLabelNames = {"tenant", "namespace", 
"function", "instance_id", "cluster"};
+    static final String[] metricsLabelNames = {"tenant", "namespace", 
"function", "instance_id", "cluster", "fqfn"};
+    static final String[] exceptionMetricsLabelNames;
+    static {
+        exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, 
metricsLabelNames.length + 2);
+        exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
+        exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts";
+    }
 
     public static final String PULSAR_FUNCTION_METRICS_PREFIX = 
"pulsar_function_";
     public final static String USER_METRIC_PREFIX = "user_metric_";
@@ -88,6 +96,12 @@
 
     final Counter statTotalRecordsRecieved1min;
 
+    // exceptions
+
+    final Gauge userExceptions;
+
+    final Gauge sysExceptions;
+
     private String[] metricsLabels;
 
     private ScheduledFuture<?> scheduledFuture;
@@ -99,6 +113,10 @@
     @Getter
     private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestSystemExceptions = EvictingQueue.create(10);
 
+    private final RateLimiter userExceptionRateLimiter;
+
+    private final RateLimiter sysExceptionRateLimiter;
+
     public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] 
metricsLabels, ScheduledExecutorService scheduledExecutorService) {
 
         this.collectorRegistry = collectorRegistry;
@@ -179,6 +197,18 @@ public FunctionStatsManager(CollectorRegistry 
collectorRegistry, String[] metric
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
 
+        userExceptions = Gauge.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception")
+                .labelNames(exceptionMetricsLabelNames)
+                .help("Exception from user code.")
+                .register(collectorRegistry);
+
+        sysExceptions = Gauge.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + "system_exception")
+                .labelNames(exceptionMetricsLabelNames)
+                .help("Exception from system code.")
+                .register(collectorRegistry);
+
         scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new 
Runnable() {
             @Override
             public void run() {
@@ -189,21 +219,41 @@ public void run() {
                 }
             }
         }, 1, 1, TimeUnit.MINUTES);
+
+        userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 
5, 1, TimeUnit.MINUTES);
+        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 
1, TimeUnit.MINUTES);
     }
 
     public void addUserException(Exception ex) {
+        long ts = System.currentTimeMillis();
         InstanceCommunication.FunctionStatus.ExceptionInformation info =
                     
InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
-                    
.setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build();
+                    
.setExceptionString(ex.getMessage()).setMsSinceEpoch(ts).build();
         latestUserExceptions.add(info);
+
+        // report exception throw prometheus
+        if (userExceptionRateLimiter.tryAcquire()) {
+            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, 
metricsLabels.length + 2);
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = 
ex.getMessage();
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = 
String.valueOf(ts);
+            userExceptions.labels(exceptionMetricsLabels).set(1.0);
+        }
     }
 
     public void addSystemException(Throwable ex) {
+        long ts = System.currentTimeMillis();
         InstanceCommunication.FunctionStatus.ExceptionInformation info =
                 
InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
-                        
.setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build();
+                        
.setExceptionString(ex.getMessage()).setMsSinceEpoch(ts).build();
         latestSystemExceptions.add(info);
 
+        // report exception throw prometheus
+        if (sysExceptionRateLimiter.tryAcquire()) {
+            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, 
metricsLabels.length + 2);
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = 
ex.getMessage();
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = 
String.valueOf(ts);
+            sysExceptions.labels(exceptionMetricsLabels).set(1.0);
+        }
     }
 
     public void incrTotalReceived() {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
index 7a6b2caa20..937c27337b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
@@ -28,7 +28,7 @@
     public final ScheduledExecutorService executor;
 
     private InstanceCache() {
-        executor = Executors.newSingleThreadScheduledExecutor();;
+        executor = Executors.newSingleThreadScheduledExecutor();
     }
 
     public static InstanceCache getInstanceCache() {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index a9e9177ebd..d302e6a8ea 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -148,11 +148,10 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
                 instanceConfig.getFunctionDetails().getTenant(),
                 String.format("%s/%s", 
instanceConfig.getFunctionDetails().getTenant(),
                         instanceConfig.getFunctionDetails().getNamespace()),
-                String.format("%s/%s/%s", 
instanceConfig.getFunctionDetails().getTenant(),
-                        instanceConfig.getFunctionDetails().getNamespace(),
-                        instanceConfig.getFunctionDetails().getName()),
+                instanceConfig.getFunctionDetails().getName(),
                 String.valueOf(instanceConfig.getInstanceId()),
-                instanceConfig.getClusterName()
+                instanceConfig.getClusterName(),
+                
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())
         };
 
         // Declare function local collector registry so that it will not clash 
with other function instances'
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py 
b/pulsar-functions/instance/src/main/python/function_stats.py
index 3d6e216a04..3ea0316190 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -20,12 +20,16 @@
 import traceback
 import time
 import util
+import sys
 
 from prometheus_client import Counter, Summary, Gauge
+from ratelimit import limits, RateLimitException
 
 # We keep track of the following metrics
 class Stats(object):
-  metrics_label_names = ['tenant', 'namespace', 'function', 'instance_id', 
'cluster']
+  metrics_label_names = ['tenant', 'namespace', 'function', 'instance_id', 
'cluster', 'fqfn']
+
+  exception_metrics_label_names = metrics_label_names + ['error', 'ts']
 
   PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_"
   USER_METRIC_PREFIX = "user_metric_";
@@ -57,7 +61,6 @@ class Stats(object):
 
   stat_total_received = Counter(PULSAR_FUNCTION_METRICS_PREFIX + 
TOTAL_RECEIVED, 'Total number of messages received from source.', 
metrics_label_names)
 
-
   # 1min windowed metrics
   stat_total_processed_successfully_1min = 
Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SUCCESSFULLY_PROCESSED_1min,
                                               'Total number of messages 
processed successfully in the last 1 minute.', metrics_label_names)
@@ -74,6 +77,11 @@ class Stats(object):
   stat_total_received_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + 
TOTAL_RECEIVED_1min,
                                 'Total number of messages received from source 
in the last 1 minute.', metrics_label_names)
 
+  # exceptions
+  user_exceptions = Gauge(PULSAR_FUNCTION_METRICS_PREFIX + 'user_exception', 
'Exception from user code.', exception_metrics_label_names)
+
+  system_exceptions = Gauge(PULSAR_FUNCTION_METRICS_PREFIX + 
'system_exception', 'Exception from system code.', 
exception_metrics_label_names)
+
   latest_user_exception = []
   latest_sys_exception = []
 
@@ -129,15 +137,15 @@ def incr_total_processed_successfully(self):
     self.stat_total_processed_successfully.labels(*self.metrics_labels).inc()
     
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels).inc()
 
-  def incr_total_sys_exceptions(self):
+  def incr_total_sys_exceptions(self, exception):
     self.stat_total_sys_exceptions.labels(*self.metrics_labels).inc()
     self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels).inc()
-    self.add_sys_exception()
+    self.add_sys_exception(exception)
 
-  def incr_total_user_exceptions(self):
+  def incr_total_user_exceptions(self, exception):
     self.stat_total_user_exceptions.labels(*self.metrics_labels).inc()
     self.stat_total_user_exceptions_1min.labels(*self.metrics_labels).inc()
-    self.add_user_exception()
+    self.add_user_exception(exception)
 
   def incr_total_received(self):
     self.stat_total_received.labels(*self.metrics_labels).inc()
@@ -155,16 +163,42 @@ def process_time_end(self):
   def set_last_invocation(self, time):
     self.stat_last_invocation.labels(*self.metrics_labels).set(time * 1000.0)
 
-  def add_user_exception(self):
-    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() 
* 1000)))
+  def add_user_exception(self, exception):
+    error = traceback.format_exc()
+    ts = int(time.time() * 1000) if sys.version_info.major >= 3 else 
long(time.time() * 1000)
+    self.latest_sys_exception.append((error, ts))
     if len(self.latest_sys_exception) > 10:
       self.latest_sys_exception.pop(0)
 
-  def add_sys_exception(self):
-    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() 
* 1000)))
+    # report exception via prometheus
+    try:
+      self.report_user_exception_prometheus(exception, ts)
+    except RateLimitException:
+      pass
+
+  @limits(calls=5, period=60)
+  def report_user_exception_prometheus(self, exception, ts):
+    exception_metric_labels = self.metrics_labels + [exception.message, 
str(ts)]
+    self.user_exceptions.labels(*exception_metric_labels).set(1.0)
+
+  def add_sys_exception(self, exception):
+    error = traceback.format_exc()
+    ts = int(time.time() * 1000) if sys.version_info.major >= 3 else 
long(time.time() * 1000)
+    self.latest_sys_exception.append((error, ts))
     if len(self.latest_sys_exception) > 10:
       self.latest_sys_exception.pop(0)
 
+    # report exception via prometheus
+    try:
+      self.report_system_exception_prometheus(exception, ts)
+    except RateLimitException:
+      pass
+
+  @limits(calls=5, period=60)
+  def report_system_exception_prometheus(self, exception, ts):
+    exception_metric_labels = self.metrics_labels + [exception.message, 
str(ts)]
+    self.system_exceptions.labels(*exception_metric_labels).set(1.0)
+
   def reset(self):
     self.latest_user_exception = []
     self.latest_sys_exception = []
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 0fc3601a6b..01593fa5c8 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -94,8 +94,9 @@ def __init__(self, instance_id, function_id, 
function_version, function_details,
     self.secrets_provider = secrets_provider
     self.metrics_labels = [function_details.tenant,
                            "%s/%s" % (function_details.tenant, 
function_details.namespace),
-                           "%s/%s/%s" % (function_details.tenant, 
function_details.namespace, function_details.name),
-                           instance_id, cluster_name]
+                           function_details.name,
+                           instance_id, cluster_name,
+                           "%s/%s/%s" % (function_details.tenant, 
function_details.namespace, function_details.name)]
     self.stats = Stats(self.metrics_labels)
 
   def health_check(self):
@@ -213,7 +214,7 @@ def actual_execution(self):
           self.stats.process_time_end()
         except Exception as e:
           Log.exception("Exception while executing user method")
-          self.stats.incr_total_user_exceptions()
+          self.stats.incr_total_user_exceptions(e)
 
         if self.log_topic_handler is not None:
           log.remove_all_handlers()
@@ -224,7 +225,7 @@ def actual_execution(self):
 
       except Exception as e:
         Log.error("Uncaught exception in Python instance: %s" % e);
-        self.stats.incr_total_sys_exceptions()
+        self.stats.incr_total_sys_exceptions(e)
 
   def done_producing(self, consumer, orig_message, result, sent_message):
     if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to