This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8d24102 prometheus metrics for functions served via brokers or
function instances should match (#3066)
8d24102 is described below
commit 8d241022724fff411f6841ff84b970f01d5dbd18
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Tue Nov 27 14:49:01 2018 -0800
prometheus metrics for functions served via brokers or function instances
should match (#3066)
* prometheus metrics for functions served via brokers or instances
themselves should match
* add additional testing
---
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 248 +++++++++++++++++++--
.../functions/instance/FunctionStatsManager.java | 15 ++
.../functions/instance/JavaInstanceRunnable.java | 7 +-
.../functions/runtime/KubernetesRuntime.java | 7 +
.../pulsar/functions/runtime/ProcessRuntime.java | 10 +-
.../apache/pulsar/functions/runtime/Runtime.java | 2 +
.../pulsar/functions/runtime/RuntimeUtils.java | 21 ++
.../pulsar/functions/runtime/ThreadRuntime.java | 6 +
.../functions/worker/FunctionsStatsGenerator.java | 43 +---
.../worker/FunctionStatsGeneratorTest.java | 220 ------------------
10 files changed, 294 insertions(+), 285 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 9214d25..f2869e6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -18,28 +18,9 @@
*/
package org.apache.pulsar.io;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.ToString;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -65,6 +46,7 @@ import
org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -76,6 +58,33 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Method;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
/**
* Test Pulsar sink on function
*
@@ -391,6 +400,79 @@ public class PulsarFunctionE2ETest {
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.getAvgProcessLatency());
+ // validate prometheus metrics empty
+ String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+ log.info("prometheus metrics: {}", prometheusMetrics);
+
+ Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
+ Metric m = metrics.get("pulsar_function_received_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_function_received_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_function_user_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_function_user_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_function_process_latency_ms");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, Double.NaN);
+ m = metrics.get("pulsar_function_process_latency_ms_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, Double.NaN);
+ m = metrics.get("pulsar_function_system_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_function_system_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_function_last_invocation");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_function_processed_successfully_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_function_processed_successfully_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, 0.0);
+
+
// validate function instance stats empty
FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData
functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats(tenant,
namespacePortion,
functionName, 0, null);
@@ -464,6 +546,78 @@ public class PulsarFunctionE2ETest {
assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
assertEquals(functionInstanceStats,
functionStats.instances.get(0).getMetrics());
+
+ // validate prometheus metrics
+ prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+ log.info("prometheus metrics: {}", prometheusMetrics);
+
+ metrics = parseMetrics(prometheusMetrics);
+ m = metrics.get("pulsar_function_received_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, (double) totalMsgs);
+ m = metrics.get("pulsar_function_received_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, (double) totalMsgs);
+ m = metrics.get("pulsar_function_user_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_function_user_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_function_process_latency_ms");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertTrue(m.value > 0.0);
+ m = metrics.get("pulsar_function_process_latency_ms_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertTrue(m.value > 0.0);
+ m = metrics.get("pulsar_function_system_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_function_system_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_function_last_invocation");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertTrue(m.value > 0.0);
+ m = metrics.get("pulsar_function_processed_successfully_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, (double) totalMsgs);
+ m = metrics.get("pulsar_function_processed_successfully_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.value, (double) totalMsgs);
}
@Test(timeOut = 20000)
@@ -628,4 +782,58 @@ public class PulsarFunctionE2ETest {
producer.close();
}
+
+ public static String getPrometheusMetrics(int metricsPort) throws
IOException {
+ StringBuilder result = new StringBuilder();
+ URL url = new URL(String.format("http://%s:%s/metrics",
InetAddress.getLocalHost().getHostAddress(), metricsPort));
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ BufferedReader rd = new BufferedReader(new
InputStreamReader(conn.getInputStream()));
+ String line;
+ while ((line = rd.readLine()) != null) {
+ result.append(line + System.lineSeparator());
+ }
+ rd.close();
+ return result.toString();
+ }
+
+ /**
+ * Hacky parsing of Prometheus text format. Sould be good enough for unit
tests
+ */
+ private static Map<String, Metric> parseMetrics(String metrics) {
+ Map<String, Metric> parsed = new HashMap<>();
+ // Example of lines are
+ // jvm_threads_current{cluster="standalone",} 203.0
+ // or
+ // pulsar_subscriptions_count{cluster="standalone",
namespace="sample/standalone/ns1",
+ // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
+ Pattern pattern =
Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
+ Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
+ Arrays.asList(metrics.split("\n")).forEach(line -> {
+ if (line.isEmpty() || line.startsWith("#")) {
+ return;
+ }
+ Matcher matcher = pattern.matcher(line);
+ checkArgument(matcher.matches());
+ String name = matcher.group(1);
+ Metric m = new Metric();
+ m.value = Double.valueOf(matcher.group(3));
+ String tags = matcher.group(2);
+ Matcher tagsMatcher = tagsPattern.matcher(tags);
+ while (tagsMatcher.find()) {
+ String tag = tagsMatcher.group(1);
+ String value = tagsMatcher.group(2);
+ m.tags.put(tag, value);
+ }
+ parsed.put(name, m);
+ });
+ return parsed;
+ }
+
+ @ToString
+ static class Metric {
+ Map<String, String> tags = new TreeMap<>();
+ double value;
+ }
+
}
\ No newline at end of file
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index 139208b..c1b7574 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -23,11 +23,14 @@ import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
+import io.prometheus.client.exporter.common.TextFormat;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.proto.InstanceCommunication;
+import java.io.IOException;
+import java.io.StringWriter;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -89,6 +92,8 @@ public class FunctionStatsManager implements AutoCloseable {
private ScheduledFuture<?> scheduledFuture;
+ private final CollectorRegistry collectorRegistry;
+
@Getter
private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestUserExceptions = EvictingQueue.create(10);
@Getter
@@ -96,6 +101,8 @@ public class FunctionStatsManager implements AutoCloseable {
public FunctionStatsManager(CollectorRegistry collectorRegistry, String[]
metricsLabels, ScheduledExecutorService scheduledExecutorService) {
+ this.collectorRegistry = collectorRegistry;
+
this.metricsLabels = metricsLabels;
statTotalProcessedSuccessfully = Counter.build()
@@ -326,6 +333,14 @@ public class FunctionStatsManager implements AutoCloseable
{
latestSystemExceptions.clear();
}
+ public String getStatsAsString() throws IOException {
+ StringWriter outputWriter = new StringWriter();
+
+ TextFormat.write004(outputWriter,
collectorRegistry.metricFamilySamples());
+
+ return outputWriter.toString();
+ }
+
@Override
public void close() {
scheduledFuture.cancel(false);
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 1d70356..a9e9177 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -24,7 +24,6 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
-import java.util.concurrent.TimeUnit;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -38,11 +37,8 @@ import
org.apache.bookkeeper.clients.exceptions.ClientException;
import org.apache.bookkeeper.clients.exceptions.InternalServerException;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
-import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.common.util.Backoff.Jitter;
import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
-import org.apache.bookkeeper.common.util.Backoff.Policy;
-import org.apache.bookkeeper.common.util.Retries;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.StorageType;
import org.apache.bookkeeper.stream.proto.StreamConfiguration;
@@ -69,7 +65,6 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.StateUtils;
@@ -86,6 +81,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
@@ -119,6 +115,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
private Throwable deathException;
// function stats
+ @Getter
private FunctionStatsManager stats;
private Record<?> currentRecord;
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 482943b..fd6c4c4 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -47,6 +47,8 @@ import io.kubernetes.client.models.V1ServiceSpec;
import io.kubernetes.client.models.V1StatefulSet;
import io.kubernetes.client.models.V1StatefulSetSpec;
import io.kubernetes.client.models.V1Toleration;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -300,6 +302,11 @@ class KubernetesRuntime implements Runtime {
}
@Override
+ public String getPrometheusMetrics() throws IOException {
+ return RuntimeUtils.getPrometheusMetrics(METRICS_PORT);
+ }
+
+ @Override
public boolean isAlive() {
return running;
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index e39c7ad..4edac17 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Utils;
+import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.TimerTask;
@@ -58,6 +59,7 @@ class ProcessRuntime implements Runtime {
@Getter
private List<String> processArgs;
private int instancePort;
+ private int metricsPort;
@Getter
private Throwable deathException;
private ManagedChannel channel;
@@ -81,6 +83,7 @@ class ProcessRuntime implements Runtime {
Long expectedHealthCheckInterval) throws Exception {
this.instanceConfig = instanceConfig;
this.instancePort = instanceConfig.getPort();
+ this.metricsPort = Utils.findAvailablePort();
this.expectedHealthCheckInterval = expectedHealthCheckInterval;
this.secretsProviderConfigurator = secretsProviderConfigurator;
String logConfigFile = null;
@@ -119,7 +122,7 @@ class ProcessRuntime implements Runtime {
false,
null,
null,
- Utils.findAvailablePort());
+ this.metricsPort);
}
/**
@@ -268,6 +271,11 @@ class ProcessRuntime implements Runtime {
return retval;
}
+ @Override
+ public String getPrometheusMetrics() throws IOException {
+ return RuntimeUtils.getPrometheusMetrics(metricsPort);
+ }
+
public CompletableFuture<InstanceCommunication.HealthCheckResult>
healthCheck() {
CompletableFuture<InstanceCommunication.HealthCheckResult> retval =
new CompletableFuture<>();
if (stub == null) {
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index ac1eced..fafdca7 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime;
import org.apache.pulsar.functions.proto.InstanceCommunication;
+import java.io.IOException;
import java.util.concurrent.CompletableFuture;
/**
@@ -46,4 +47,5 @@ public interface Runtime {
CompletableFuture<InstanceCommunication.MetricsData> getMetrics();
+ String getPrometheusMetrics() throws IOException;
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 05f21e6..a5f55c5 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -20,6 +20,13 @@
package org.apache.pulsar.functions.runtime;
import com.google.protobuf.util.JsonFormat;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
import java.util.LinkedList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@@ -186,4 +193,18 @@ class RuntimeUtils {
args.add(instanceConfig.getClusterName());
return args;
}
+
+ public static String getPrometheusMetrics(int metricsPort) throws
IOException{
+ StringBuilder result = new StringBuilder();
+ URL url = new URL(String.format("http://%s:%s",
InetAddress.getLocalHost().getHostAddress(), metricsPort));
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ BufferedReader rd = new BufferedReader(new
InputStreamReader(conn.getInputStream()));
+ String line;
+ while ((line = rd.readLine()) != null) {
+ result.append(line + System.lineSeparator());
+ }
+ rd.close();
+ return result.toString();
+ }
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 460cdb0..be049c3 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.runtime;
+import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import io.prometheus.client.CollectorRegistry;
@@ -157,6 +158,11 @@ class ThreadRuntime implements Runtime {
}
@Override
+ public String getPrometheusMetrics() throws IOException {
+ return javaInstanceRunnable.getStats().getStatsAsString();
+ }
+
+ @Override
public CompletableFuture<Void> resetMetrics() {
javaInstanceRunnable.resetMetrics();
return CompletableFuture.completedFuture(null);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 2b8e0fd..cd186cc 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -18,17 +18,15 @@
*/
package org.apache.pulsar.functions.worker;
-import org.apache.pulsar.functions.instance.FunctionStatsManager;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
/**
* A class to generate stats for pulsar functions running on this broker
@@ -57,31 +55,10 @@ public class FunctionsStatsGenerator {
Runtime functionRuntime =
functionRuntimeSpawner.getRuntime();
if (functionRuntime != null) {
try {
- InstanceCommunication.MetricsData metrics =
functionRuntime.getMetrics().get();
-
- String tenant =
functionRuntimeInfo.getFunctionInstance()
-
.getFunctionMetaData().getFunctionDetails().getTenant();
- String namespace =
functionRuntimeInfo.getFunctionInstance()
-
.getFunctionMetaData().getFunctionDetails().getNamespace();
- String name =
functionRuntimeInfo.getFunctionInstance()
-
.getFunctionMetaData().getFunctionDetails().getName();
- int instanceId =
functionRuntimeInfo.getFunctionInstance().getInstanceId();
- String qualifiedNamespace = String.format("%s/%s",
tenant, namespace);
-
- metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX +
FunctionStatsManager.PROCESS_LATENCY_MS, instanceId,
metrics.getAvgProcessLatency());
- metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX +
FunctionStatsManager.LAST_INVOCATION, instanceId, metrics.getLastInvocation());
- metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX +
FunctionStatsManager.PROCESSED_SUCCESSFULLY_TOTAL, instanceId,
metrics.getProcessedSuccessfullyTotal());
- metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX +
FunctionStatsManager.RECEIVED_TOTAL, instanceId, metrics.getReceivedTotal());
- metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX +
FunctionStatsManager.SYSTEM_EXCEPTIONS_TOTAL, instanceId,
metrics.getSystemExceptionsTotal());
- metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX +
FunctionStatsManager.USER_EXCEPTIONS_TOTAL, instanceId,
metrics.getUserExceptionsTotal());
- for (Map.Entry<String, Double> userMetricsMapEntry
: metrics.getUserMetricsMap().entrySet()) {
- String userMetricName =
userMetricsMapEntry.getKey();
- Double val = userMetricsMapEntry.getValue();
- metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + userMetricName,
instanceId, val);
- }
+ out.write(functionRuntime.getPrometheusMetrics());
- } catch (InterruptedException | ExecutionException e) {
+ } catch (IOException e) {
log.warn("Failed to collect metrics for function
instance {}",
fullyQualifiedInstanceName, e);
}
@@ -90,16 +67,4 @@ public class FunctionsStatsGenerator {
}
}
}
-
- private static void metricType(SimpleTextOutputStream stream, String name)
{
- stream.write("# TYPE ").write(name).write(" gauge\n");
- }
-
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace,
- String functionName, String metricName, int
instanceId, double value) {
- metricType(stream, metricName);
-
stream.write(metricName).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
-
.write("\",name=\"").write(functionName).write("\",instanceId=\"").write(instanceId).write("\"}
");
- stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
- }
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
deleted file mode 100644
index 4b54bcf..0000000
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import lombok.ToString;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
-import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.utils.Utils;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-
-public class FunctionStatsGeneratorTest {
-
- @Test
- public void testGenerateFunctionStatsWhenWorkerServiceIsNotInitialized() {
- WorkerService workerService = mock(WorkerService.class);
- when(workerService.isInitialized()).thenReturn(false);
- FunctionsStatsGenerator.generate(
- workerService, "test-cluster", new
SimpleTextOutputStream(Unpooled.buffer()));
- verify(workerService, times(1)).isInitialized();
- verify(workerService, times(0)).getFunctionRuntimeManager();
- }
-
- @Test
- public void testGenerateFunctionStatsOnK8SRuntimeFactory() {
- WorkerService workerService = mock(WorkerService.class);
- when(workerService.isInitialized()).thenReturn(true);
- FunctionRuntimeManager frm = mock(FunctionRuntimeManager.class);
-
when(frm.getRuntimeFactory()).thenReturn(mock(KubernetesRuntimeFactory.class));
- when(workerService.getFunctionRuntimeManager()).thenReturn(frm);
- FunctionsStatsGenerator.generate(
- workerService, "test-cluster", new
SimpleTextOutputStream(Unpooled.buffer()));
- verify(workerService, times(1)).isInitialized();
- verify(workerService, times(1)).getFunctionRuntimeManager();
- verify(frm, times(0)).getFunctionRuntimeInfos();
- }
-
- @Test
- public void testFunctionsStatsGenerate() {
- FunctionRuntimeManager functionRuntimeManager =
mock(FunctionRuntimeManager.class);
- Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new
HashMap<>();
-
- WorkerService workerService = mock(WorkerService.class);
-
doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager();
- doReturn(new WorkerConfig()).when(workerService).getWorkerConfig();
- when(workerService.isInitialized()).thenReturn(true);
-
- CompletableFuture<InstanceCommunication.MetricsData>
metricsDataCompletableFuture = new CompletableFuture<>();
- InstanceCommunication.MetricsData metricsData =
InstanceCommunication.MetricsData.newBuilder()
- .setReceivedTotal(101)
- .setProcessedSuccessfullyTotal(99)
- .setAvgProcessLatency(10.0)
- .setUserExceptionsTotal(3)
- .setSystemExceptionsTotal(1)
- .setLastInvocation(1542324900)
- .build();
-
- metricsDataCompletableFuture.complete(metricsData);
- Runtime runtime = mock(Runtime.class);
- doReturn(metricsDataCompletableFuture).when(runtime).getMetrics();
-
- RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class);
- doReturn(runtime).when(runtimeSpawner).getRuntime();
-
- Function.FunctionMetaData function1 =
Function.FunctionMetaData.newBuilder().setFunctionDetails(
- Function.FunctionDetails.newBuilder()
-
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
-
- Function.Instance instance = Function.Instance.newBuilder()
- .setFunctionMetaData(function1).setInstanceId(0).build();
-
- FunctionRuntimeInfo functionRuntimeInfo =
mock(FunctionRuntimeInfo.class);
- doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner();
- doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
-
-
functionRuntimeInfoMap.put(Utils.getFullyQualifiedInstanceId(instance),
functionRuntimeInfo);
-
doReturn(functionRuntimeInfoMap).when(functionRuntimeManager).getFunctionRuntimeInfos();
-
- ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
- SimpleTextOutputStream statsOut = new SimpleTextOutputStream(buf);
- FunctionsStatsGenerator.generate(workerService, "default", statsOut);
-
- String str = buf.toString(Charset.defaultCharset());
-
- buf.release();
- Map<String, Metric> metrics = parseMetrics(str);
-
- Assert.assertEquals(metrics.size(), 6);
-
- System.out.println("metrics: " + metrics);
- Metric m = metrics.get("pulsar_function_received_total");
- assertEquals(m.tags.get("cluster"), "default");
- assertEquals(m.tags.get("instanceId"), "0");
- assertEquals(m.tags.get("name"), "func-1");
- assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
- assertEquals(m.value, 101.0);
-
- m = metrics.get("pulsar_function_user_exceptions_total");
- assertEquals(m.tags.get("cluster"), "default");
- assertEquals(m.tags.get("instanceId"), "0");
- assertEquals(m.tags.get("name"), "func-1");
- assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
- assertEquals(m.value, 3.0);
-
- m = metrics.get("pulsar_function_process_latency_ms");
- assertEquals(m.tags.get("cluster"), "default");
- assertEquals(m.tags.get("instanceId"), "0");
- assertEquals(m.tags.get("name"), "func-1");
- assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
- assertEquals(m.value, 10.0);
-
- m = metrics.get("pulsar_function_system_exceptions_total");
- assertEquals(m.tags.get("cluster"), "default");
- assertEquals(m.tags.get("instanceId"), "0");
- assertEquals(m.tags.get("name"), "func-1");
- assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
- assertEquals(m.value, 1.0);
-
- m = metrics.get("pulsar_function_last_invocation");
- assertEquals(m.tags.get("cluster"), "default");
- assertEquals(m.tags.get("instanceId"), "0");
- assertEquals(m.tags.get("name"), "func-1");
- assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
- assertEquals(m.value, 1542324900.0);
-
- m = metrics.get("pulsar_function_processed_successfully_total");
- assertEquals(m.tags.get("cluster"), "default");
- assertEquals(m.tags.get("instanceId"), "0");
- assertEquals(m.tags.get("name"), "func-1");
- assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
- assertEquals(m.value, 99.0);
- }
-
- /**
- * Hacky parsing of Prometheus text format. Sould be good enough for unit
tests
- */
- private static Map<String, Metric> parseMetrics(String metrics) {
- Map<String, Metric> parsed = new HashMap<>();
-
- // Example of lines are
- // jvm_threads_current{cluster="standalone",} 203.0
- // or
- // pulsar_subscriptions_count{cluster="standalone",
namespace="sample/standalone/ns1",
- // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
- Pattern pattern =
Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
- Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
-
- Arrays.asList(metrics.split("\n")).forEach(line -> {
- if (line.isEmpty() || line.startsWith("#")) {
- return;
- }
- Matcher matcher = pattern.matcher(line);
-
- checkArgument(matcher.matches());
- String name = matcher.group(1);
-
- Metric m = new Metric();
- m.value = Double.valueOf(matcher.group(3));
-
- String tags = matcher.group(2);
- Matcher tagsMatcher = tagsPattern.matcher(tags);
- while (tagsMatcher.find()) {
- String tag = tagsMatcher.group(1);
- String value = tagsMatcher.group(2);
- m.tags.put(tag, value);
- }
-
- parsed.put(name, m);
- });
-
- return parsed;
- }
-
- @ToString
- static class Metric {
- Map<String, String> tags = new TreeMap<>();
- double value;
- }
-
-}