jerrypeng closed pull request #3066: prometheus metrics for functions served 
via brokers or function instances should match
URL: https://github.com/apache/pulsar/pull/3066
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 9214d253db..f2869e69d8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -18,28 +18,9 @@
  */
 package org.apache.pulsar.io;
 
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.ToString;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -65,6 +46,7 @@
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -76,6 +58,33 @@
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Method;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
 /**
  * Test Pulsar sink on function
  *
@@ -391,6 +400,79 @@ public void testPulsarFunctionStats() throws Exception {
         
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
         
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.getAvgProcessLatency());
 
+        // validate prometheus metrics empty
+        String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+        log.info("prometheus metrics: {}", prometheusMetrics);
+
+        Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
+        Metric m = metrics.get("pulsar_function_received_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_received_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_user_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_user_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_process_latency_ms");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, Double.NaN);
+        m = metrics.get("pulsar_function_process_latency_ms_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, Double.NaN);
+        m = metrics.get("pulsar_function_system_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_system_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_last_invocation");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_processed_successfully_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_processed_successfully_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+
+
         // validate function instance stats empty
         FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats(tenant, 
namespacePortion,
                 functionName, 0,  null);
@@ -464,6 +546,78 @@ public void testPulsarFunctionStats() throws Exception {
 
         assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
         assertEquals(functionInstanceStats, 
functionStats.instances.get(0).getMetrics());
+
+        // validate prometheus metrics
+        prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+        log.info("prometheus metrics: {}", prometheusMetrics);
+
+        metrics = parseMetrics(prometheusMetrics);
+        m = metrics.get("pulsar_function_received_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, (double) totalMsgs);
+        m = metrics.get("pulsar_function_received_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, (double) totalMsgs);
+        m = metrics.get("pulsar_function_user_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_user_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_process_latency_ms");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_function_process_latency_ms_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_function_system_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_system_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_last_invocation");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_function_processed_successfully_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, (double) totalMsgs);
+        m = metrics.get("pulsar_function_processed_successfully_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, (double) totalMsgs);
     }
 
     @Test(timeOut = 20000)
@@ -628,4 +782,58 @@ public void testFunctionStopAndRestartApi() throws 
Exception {
 
         producer.close();
     }
+
+    public static String getPrometheusMetrics(int metricsPort) throws 
IOException {
+        StringBuilder result = new StringBuilder();
+        URL url = new URL(String.format("http://%s:%s/metrics";, 
InetAddress.getLocalHost().getHostAddress(), metricsPort));
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setRequestMethod("GET");
+        BufferedReader rd = new BufferedReader(new 
InputStreamReader(conn.getInputStream()));
+        String line;
+        while ((line = rd.readLine()) != null) {
+            result.append(line + System.lineSeparator());
+        }
+        rd.close();
+        return result.toString();
+    }
+
+    /**
+     * Hacky parsing of Prometheus text format. Sould be good enough for unit 
tests
+     */
+    private static Map<String, Metric> parseMetrics(String metrics) {
+        Map<String, Metric> parsed = new HashMap<>();
+        // Example of lines are
+        // jvm_threads_current{cluster="standalone",} 203.0
+        // or
+        // pulsar_subscriptions_count{cluster="standalone", 
namespace="sample/standalone/ns1",
+        // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
+        Pattern pattern = 
Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
+        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
+        Arrays.asList(metrics.split("\n")).forEach(line -> {
+            if (line.isEmpty() || line.startsWith("#")) {
+                return;
+            }
+            Matcher matcher = pattern.matcher(line);
+            checkArgument(matcher.matches());
+            String name = matcher.group(1);
+            Metric m = new Metric();
+            m.value = Double.valueOf(matcher.group(3));
+            String tags = matcher.group(2);
+            Matcher tagsMatcher = tagsPattern.matcher(tags);
+            while (tagsMatcher.find()) {
+                String tag = tagsMatcher.group(1);
+                String value = tagsMatcher.group(2);
+                m.tags.put(tag, value);
+            }
+            parsed.put(name, m);
+        });
+        return parsed;
+    }
+
+    @ToString
+    static class Metric {
+        Map<String, String> tags = new TreeMap<>();
+        double value;
+    }
+
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index 139208b77f..c1b7574199 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -23,11 +23,14 @@
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Summary;
+import io.prometheus.client.exporter.common.TextFormat;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
+import java.io.IOException;
+import java.io.StringWriter;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -89,6 +92,8 @@
 
     private ScheduledFuture<?> scheduledFuture;
 
+    private final CollectorRegistry collectorRegistry;
+
     @Getter
     private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestUserExceptions = EvictingQueue.create(10);
     @Getter
@@ -96,6 +101,8 @@
 
     public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] 
metricsLabels, ScheduledExecutorService scheduledExecutorService) {
 
+        this.collectorRegistry = collectorRegistry;
+
         this.metricsLabels = metricsLabels;
 
         statTotalProcessedSuccessfully = Counter.build()
@@ -326,6 +333,14 @@ public void reset() {
         latestSystemExceptions.clear();
     }
 
+    public String getStatsAsString() throws IOException {
+        StringWriter outputWriter = new StringWriter();
+
+        TextFormat.write004(outputWriter, 
collectorRegistry.metricFamilySamples());
+
+        return outputWriter.toString();
+    }
+
     @Override
     public void close() {
         scheduledFuture.cancel(false);
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 1d70356077..a9e9177ebd 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -24,7 +24,6 @@
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
-import java.util.concurrent.TimeUnit;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -38,11 +37,8 @@
 import org.apache.bookkeeper.clients.exceptions.InternalServerException;
 import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
 import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
-import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.common.util.Backoff.Jitter;
 import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
-import org.apache.bookkeeper.common.util.Backoff.Policy;
-import org.apache.bookkeeper.common.util.Retries;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.StorageType;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
@@ -69,7 +65,6 @@
 import org.apache.pulsar.functions.sink.PulsarSinkDisable;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.StateUtils;
@@ -86,6 +81,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
@@ -119,6 +115,7 @@
     private Throwable deathException;
 
     // function stats
+    @Getter
     private FunctionStatsManager stats;
 
     private Record<?> currentRecord;
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 482943be2c..fd6c4c4439 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -47,6 +47,8 @@
 import io.kubernetes.client.models.V1StatefulSet;
 import io.kubernetes.client.models.V1StatefulSetSpec;
 import io.kubernetes.client.models.V1Toleration;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -299,6 +301,11 @@ public void onSuccess(FunctionStatus t) {
         return retval;
     }
 
+    @Override
+    public String getPrometheusMetrics() throws IOException {
+        return RuntimeUtils.getPrometheusMetrics(METRICS_PORT);
+    }
+
     @Override
     public boolean isAlive() {
         return running;
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index e39c7ade73..4edac1784b 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -38,6 +38,7 @@
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.Utils;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 import java.util.TimerTask;
@@ -58,6 +59,7 @@
     @Getter
     private List<String> processArgs;
     private int instancePort;
+    private int metricsPort;
     @Getter
     private Throwable deathException;
     private ManagedChannel channel;
@@ -81,6 +83,7 @@
                    Long expectedHealthCheckInterval) throws Exception {
         this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
+        this.metricsPort = Utils.findAvailablePort();
         this.expectedHealthCheckInterval = expectedHealthCheckInterval;
         this.secretsProviderConfigurator = secretsProviderConfigurator;
         String logConfigFile = null;
@@ -119,7 +122,7 @@
             false,
             null,
             null,
-                Utils.findAvailablePort());
+                this.metricsPort);
     }
 
     /**
@@ -268,6 +271,11 @@ public void onSuccess(InstanceCommunication.MetricsData t) 
{
         return retval;
     }
 
+    @Override
+    public String getPrometheusMetrics() throws IOException {
+        return RuntimeUtils.getPrometheusMetrics(metricsPort);
+    }
+
     public CompletableFuture<InstanceCommunication.HealthCheckResult> 
healthCheck() {
         CompletableFuture<InstanceCommunication.HealthCheckResult> retval = 
new CompletableFuture<>();
         if (stub == null) {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index ac1eceda7e..fafdca7123 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -21,6 +21,7 @@
 
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -46,4 +47,5 @@
     
     CompletableFuture<InstanceCommunication.MetricsData> getMetrics();
 
+    String getPrometheusMetrics() throws IOException;
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 05f21e614c..a5f55c57ee 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -20,6 +20,13 @@
 package org.apache.pulsar.functions.runtime;
 
 import com.google.protobuf.util.JsonFormat;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
 import java.util.LinkedList;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
@@ -186,4 +193,18 @@
         args.add(instanceConfig.getClusterName());
         return args;
     }
+
+    public static String getPrometheusMetrics(int metricsPort) throws 
IOException{
+        StringBuilder result = new StringBuilder();
+        URL url = new URL(String.format("http://%s:%s";, 
InetAddress.getLocalHost().getHostAddress(), metricsPort));
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setRequestMethod("GET");
+        BufferedReader rd = new BufferedReader(new 
InputStreamReader(conn.getInputStream()));
+        String line;
+        while ((line = rd.readLine()) != null) {
+            result.append(line + System.lineSeparator());
+        }
+        rd.close();
+        return result.toString();
+    }
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 460cdb0cd8..be049c38f9 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.functions.runtime;
 
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
 import io.prometheus.client.CollectorRegistry;
@@ -156,6 +157,11 @@ public void stop() {
         return 
CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics());
     }
 
+    @Override
+    public String getPrometheusMetrics() throws IOException {
+        return javaInstanceRunnable.getStats().getStatsAsString();
+    }
+
     @Override
     public CompletableFuture<Void> resetMetrics() {
         javaInstanceRunnable.resetMetrics();
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 2b8e0fd595..cd186cc4b1 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -18,17 +18,15 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import org.apache.pulsar.functions.instance.FunctionStatsManager;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 
 /**
  * A class to generate stats for pulsar functions running on this broker
@@ -57,31 +55,10 @@ public static void generate(WorkerService workerService, 
String cluster, SimpleT
                     Runtime functionRuntime = 
functionRuntimeSpawner.getRuntime();
                     if (functionRuntime != null) {
                         try {
-                            InstanceCommunication.MetricsData metrics = 
functionRuntime.getMetrics().get();
-
-                            String tenant = 
functionRuntimeInfo.getFunctionInstance()
-                                    
.getFunctionMetaData().getFunctionDetails().getTenant();
-                            String namespace = 
functionRuntimeInfo.getFunctionInstance()
-                                    
.getFunctionMetaData().getFunctionDetails().getNamespace();
-                            String name = 
functionRuntimeInfo.getFunctionInstance()
-                                    
.getFunctionMetaData().getFunctionDetails().getName();
-                            int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
-                            String qualifiedNamespace = String.format("%s/%s", 
tenant, namespace);
-
-                            metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.PROCESS_LATENCY_MS, instanceId, 
metrics.getAvgProcessLatency());
-                            metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.LAST_INVOCATION, instanceId, metrics.getLastInvocation());
-                            metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.PROCESSED_SUCCESSFULLY_TOTAL, instanceId, 
metrics.getProcessedSuccessfullyTotal());
-                            metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.RECEIVED_TOTAL, instanceId, metrics.getReceivedTotal());
-                            metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.SYSTEM_EXCEPTIONS_TOTAL, instanceId, 
metrics.getSystemExceptionsTotal());
-                            metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.USER_EXCEPTIONS_TOTAL, instanceId, 
metrics.getUserExceptionsTotal());
 
-                            for (Map.Entry<String, Double> userMetricsMapEntry 
: metrics.getUserMetricsMap().entrySet()) {
-                                String userMetricName = 
userMetricsMapEntry.getKey();
-                                Double val = userMetricsMapEntry.getValue();
-                                metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + userMetricName, 
instanceId, val);
-                            }
+                            out.write(functionRuntime.getPrometheusMetrics());
 
-                        } catch (InterruptedException | ExecutionException e) {
+                        } catch (IOException e) {
                             log.warn("Failed to collect metrics for function 
instance {}",
                                     fullyQualifiedInstanceName, e);
                         }
@@ -90,16 +67,4 @@ public static void generate(WorkerService workerService, 
String cluster, SimpleT
             }
         }
     }
-
-    private static void metricType(SimpleTextOutputStream stream, String name) 
{
-        stream.write("# TYPE ").write(name).write(" gauge\n");
-    }
-
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace,
-                               String functionName, String metricName, int 
instanceId, double value) {
-        metricType(stream, metricName);
-        
stream.write(metricName).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
-                
.write("\",name=\"").write(functionName).write("\",instanceId=\"").write(instanceId).write("\"}
 ");
-        stream.write(value).write(' 
').write(System.currentTimeMillis()).write('\n');
-    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
deleted file mode 100644
index 4b54bcfc98..0000000000
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import lombok.ToString;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
-import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.utils.Utils;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-
-public class FunctionStatsGeneratorTest {
-
-    @Test
-    public void testGenerateFunctionStatsWhenWorkerServiceIsNotInitialized() {
-        WorkerService workerService = mock(WorkerService.class);
-        when(workerService.isInitialized()).thenReturn(false);
-        FunctionsStatsGenerator.generate(
-            workerService, "test-cluster", new 
SimpleTextOutputStream(Unpooled.buffer()));
-        verify(workerService, times(1)).isInitialized();
-        verify(workerService, times(0)).getFunctionRuntimeManager();
-    }
-
-    @Test
-    public void testGenerateFunctionStatsOnK8SRuntimeFactory() {
-        WorkerService workerService = mock(WorkerService.class);
-        when(workerService.isInitialized()).thenReturn(true);
-        FunctionRuntimeManager frm = mock(FunctionRuntimeManager.class);
-        
when(frm.getRuntimeFactory()).thenReturn(mock(KubernetesRuntimeFactory.class));
-        when(workerService.getFunctionRuntimeManager()).thenReturn(frm);
-        FunctionsStatsGenerator.generate(
-            workerService, "test-cluster", new 
SimpleTextOutputStream(Unpooled.buffer()));
-        verify(workerService, times(1)).isInitialized();
-        verify(workerService, times(1)).getFunctionRuntimeManager();
-        verify(frm, times(0)).getFunctionRuntimeInfos();
-    }
-
-    @Test
-    public void testFunctionsStatsGenerate() {
-        FunctionRuntimeManager functionRuntimeManager = 
mock(FunctionRuntimeManager.class);
-        Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new 
HashMap<>();
-
-        WorkerService workerService = mock(WorkerService.class);
-        
doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager();
-        doReturn(new WorkerConfig()).when(workerService).getWorkerConfig();
-        when(workerService.isInitialized()).thenReturn(true);
-
-        CompletableFuture<InstanceCommunication.MetricsData> 
metricsDataCompletableFuture = new CompletableFuture<>();
-        InstanceCommunication.MetricsData metricsData = 
InstanceCommunication.MetricsData.newBuilder()
-                .setReceivedTotal(101)
-                .setProcessedSuccessfullyTotal(99)
-                .setAvgProcessLatency(10.0)
-                .setUserExceptionsTotal(3)
-                .setSystemExceptionsTotal(1)
-                .setLastInvocation(1542324900)
-                .build();
-
-        metricsDataCompletableFuture.complete(metricsData);
-        Runtime runtime = mock(Runtime.class);
-        doReturn(metricsDataCompletableFuture).when(runtime).getMetrics();
-
-        RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class);
-        doReturn(runtime).when(runtimeSpawner).getRuntime();
-
-        Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
-                Function.FunctionDetails.newBuilder()
-                        
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
-
-        Function.Instance instance = Function.Instance.newBuilder()
-                .setFunctionMetaData(function1).setInstanceId(0).build();
-
-        FunctionRuntimeInfo functionRuntimeInfo = 
mock(FunctionRuntimeInfo.class);
-        doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner();
-        doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
-
-        
functionRuntimeInfoMap.put(Utils.getFullyQualifiedInstanceId(instance), 
functionRuntimeInfo);
-        
doReturn(functionRuntimeInfoMap).when(functionRuntimeManager).getFunctionRuntimeInfos();
-
-        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
-        SimpleTextOutputStream statsOut = new SimpleTextOutputStream(buf);
-        FunctionsStatsGenerator.generate(workerService, "default", statsOut);
-
-        String str = buf.toString(Charset.defaultCharset());
-
-        buf.release();
-        Map<String, Metric> metrics = parseMetrics(str);
-
-        Assert.assertEquals(metrics.size(), 6);
-
-        System.out.println("metrics: " + metrics);
-        Metric m = metrics.get("pulsar_function_received_total");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 101.0);
-
-        m = metrics.get("pulsar_function_user_exceptions_total");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 3.0);
-
-        m = metrics.get("pulsar_function_process_latency_ms");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 10.0);
-
-        m = metrics.get("pulsar_function_system_exceptions_total");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 1.0);
-
-        m = metrics.get("pulsar_function_last_invocation");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 1542324900.0);
-
-        m = metrics.get("pulsar_function_processed_successfully_total");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 99.0);
-    }
-
-    /**
-     * Hacky parsing of Prometheus text format. Sould be good enough for unit 
tests
-     */
-    private static Map<String, Metric> parseMetrics(String metrics) {
-        Map<String, Metric> parsed = new HashMap<>();
-
-        // Example of lines are
-        // jvm_threads_current{cluster="standalone",} 203.0
-        // or
-        // pulsar_subscriptions_count{cluster="standalone", 
namespace="sample/standalone/ns1",
-        // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
-        Pattern pattern = 
Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
-        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
-
-        Arrays.asList(metrics.split("\n")).forEach(line -> {
-            if (line.isEmpty() || line.startsWith("#")) {
-                return;
-            }
-            Matcher matcher = pattern.matcher(line);
-
-            checkArgument(matcher.matches());
-            String name = matcher.group(1);
-
-            Metric m = new Metric();
-            m.value = Double.valueOf(matcher.group(3));
-
-            String tags = matcher.group(2);
-            Matcher tagsMatcher = tagsPattern.matcher(tags);
-            while (tagsMatcher.find()) {
-                String tag = tagsMatcher.group(1);
-                String value = tagsMatcher.group(2);
-                m.tags.put(tag, value);
-            }
-
-            parsed.put(name, m);
-        });
-
-        return parsed;
-    }
-
-    @ToString
-    static class Metric {
-        Map<String, String> tags = new TreeMap<>();
-        double value;
-    }
-
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to