This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 900e747 Expose metrics via http port in function instance (#2930)
900e747 is described below
commit 900e747908cfc916f9ff8ea5ff24041133c4ac99
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Thu Nov 8 12:16:07 2018 -0800
Expose metrics via http port in function instance (#2930)
* fix bugs in python metrics
* instance expose metrics
* remove commented out code
* fix unit tests
* remove commented out code
* fixing test
* fix python instance test
* removing old code
* fix bug
* refactoring java metrics
* refactoring python metrics
* cleaning up code
* removing unneccessary code
* improving metrics format
* fixing test
* fix bugs and revising format
* fix bug
* fix for python3
* change user defined metric to summary
* renaming labels
* change back python
---
.../pulsar/functions/instance/ContextImpl.java | 73 +++++++++----
.../pulsar/functions/instance/FunctionStats.java | 58 ++++++++---
.../pulsar/functions/instance/InstanceConfig.java | 1 +
.../functions/instance/JavaInstanceRunnable.java | 56 ++++++----
.../instance/src/main/python/contextimpl.py | 42 +++++---
.../instance/src/main/python/function_stats.py | 75 ++++++++++++++
.../instance/src/main/python/python_instance.py | 115 ++++++++-------------
.../src/main/python/python_instance_main.py | 8 +-
.../pulsar/functions/instance/ContextImplTest.java | 3 +-
.../instance/JavaInstanceRunnableTest.java | 2 +-
.../src/test/python/test_python_instance.py | 2 +-
.../pulsar/functions/runtime/JavaInstanceMain.java | 34 +++++-
.../functions/runtime/KubernetesRuntime.java | 79 ++------------
.../pulsar/functions/runtime/LocalRunner.java | 1 +
.../pulsar/functions/runtime/ProcessRuntime.java | 4 +-
.../pulsar/functions/runtime/RuntimeUtils.java | 9 +-
.../pulsar/functions/runtime/ThreadRuntime.java | 15 ++-
.../functions/runtime/ThreadRuntimeFactory.java | 12 ++-
.../functions/runtime/KubernetesRuntimeTest.java | 22 ++--
.../functions/runtime/ProcessRuntimeTest.java | 21 ++--
.../pulsar/functions/worker/FunctionActioner.java | 1 +
.../functions/worker/FunctionRuntimeManager.java | 4 +-
.../functions/worker/FunctionsStatsGenerator.java | 8 +-
.../worker/FunctionStatsGeneratorTest.java | 20 ++--
.../functions/worker/SchedulerManagerTest.java | 22 ++--
25 files changed, 417 insertions(+), 270 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 406fe13..d13964d 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,25 +18,12 @@
*/
package org.apache.pulsar.functions.instance;
-import static com.google.common.base.Preconditions.checkState;
-
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Summary;
import lombok.Getter;
import lombok.Setter;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -55,9 +42,24 @@ import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkState;
+
/**
* This class implements the Context interface exposed to the user.
*/
+
class ContextImpl implements Context, SinkContext, SourceContext {
private InstanceConfig config;
private Logger logger;
@@ -92,7 +94,6 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
}
}
- private ConcurrentMap<String, AccumulatedMetricDatum>
currentAccumulatedMetrics;
private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics;
private Map<String, Producer<?>> publishProducers;
@@ -110,11 +111,21 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
private StateContextImpl stateContext;
private Map<String, Object> userConfigs;
+ Map<String, String[]> userMetricsLabels = new HashMap<>();
+ private final String[] metricsLabels;
+ private final Summary userMetricsSummary;
+
+ private final static String[] userMetricsLabelNames;
+ static {
+ // add label to indicate user metric
+ userMetricsLabelNames = Arrays.copyOf(FunctionStats.metricsLabelNames,
FunctionStats.metricsLabelNames.length + 1);
+ userMetricsLabelNames[FunctionStats.metricsLabelNames.length] =
"metric";
+ }
+
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient
client, List<String> inputTopics,
- SecretsProvider secretsProvider) {
+ SecretsProvider secretsProvider, CollectorRegistry
collectorRegistry, String[] metricsLabels) {
this.config = config;
this.logger = logger;
- this.currentAccumulatedMetrics = new ConcurrentHashMap<>();
this.accumulatedMetrics = new ConcurrentHashMap<>();
this.publishProducers = new HashMap<>();
this.inputTopics = inputTopics;
@@ -138,6 +149,17 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
} else {
secretsMap = new HashMap<>();
}
+
+ this.metricsLabels = metricsLabels;
+ this.userMetricsSummary = Summary.build()
+ .name("pulsar_function_user_metric")
+ .help("Pulsar Function user defined metric.")
+ .labelNames(userMetricsLabelNames)
+ .quantile(0.5, 0.01)
+ .quantile(0.9, 0.01)
+ .quantile(0.99, 0.01)
+ .quantile(0.999, 0.01)
+ .register(collectorRegistry);
}
public void setCurrentMessageContext(Record<?> record) {
@@ -320,8 +342,16 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
@Override
public void recordMetric(String metricName, double value) {
- currentAccumulatedMetrics.putIfAbsent(metricName, new
AccumulatedMetricDatum());
- currentAccumulatedMetrics.get(metricName).update(value);
+ userMetricsLabels.computeIfAbsent(metricName,
+ s -> {
+ String[] userMetricLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 1);
+ userMetricLabels[userMetricLabels.length - 1] = metricName;
+ return userMetricLabels;
+ });
+
+
userMetricsSummary.labels(userMetricsLabels.get(metricName)).observe(value);
+ accumulatedMetrics.putIfAbsent(metricName, new
AccumulatedMetricDatum());
+ accumulatedMetrics.get(metricName).update(value);
}
public MetricsData getAndResetMetrics() {
@@ -331,9 +361,8 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
}
public void resetMetrics() {
+ userMetricsSummary.clear();
this.accumulatedMetrics.clear();
- this.accumulatedMetrics.putAll(currentAccumulatedMetrics);
- this.currentAccumulatedMetrics.clear();
}
public MetricsData getMetrics() {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
index 15b01f7..c456997 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.instance;
import com.google.common.collect.EvictingQueue;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
import lombok.Getter;
import lombok.Setter;
@@ -35,7 +36,16 @@ import
org.apache.pulsar.functions.proto.InstanceCommunication;
@Setter
public class FunctionStats {
- private static final String[] metricsLabelNames = {"tenant", "namespace",
"name", "instance_id"};
+ static final String[] metricsLabelNames = {"tenant", "namespace",
"function", "instance_id", "cluster"};
+
+ /** Declare metric names **/
+ static final String PULSAR_FUNCTION_PROCESSED_TOTAL =
"pulsar_function_processed_total";
+ static final String PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL =
"pulsar_function_processed_successfully_total";
+ static final String PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL =
"pulsar_function_system_exceptions_total";
+ static final String PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL =
"pulsar_function_user_exceptions_total";
+ static final String PULSAR_FUNCTION_PROCESS_LATENCY_MS =
"pulsar_function_process_latency_ms";
+ static final String PULSAR_FUNCTION_LAST_INVOCATION =
"pulsar_function_last_invocation";
+ static final String PULSAR_FUNCTION_RECEIVED_TOTAL =
"pulsar_function_received_total";
/** Declare Prometheus stats **/
@@ -49,6 +59,10 @@ public class FunctionStats {
final Summary statProcessLatency;
+ final Gauge statlastInvocation;
+
+ final Counter statTotalRecordsRecieved;
+
CollectorRegistry functionCollectorRegistry;
@Getter
@@ -56,47 +70,56 @@ public class FunctionStats {
@Getter
private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSystemExceptions = EvictingQueue.create(10);
- @Getter
- @Setter
- private long lastInvocationTime = 0;
-
- public FunctionStats() {
+ public FunctionStats(CollectorRegistry collectorRegistry) {
// Declare function local collector registry so that it will not clash
with other function instances'
// metrics collection especially in threaded mode
functionCollectorRegistry = new CollectorRegistry();
statTotalProcessed = Counter.build()
- .name("__function_total_processed__")
+ .name(PULSAR_FUNCTION_PROCESSED_TOTAL)
.help("Total number of messages processed.")
.labelNames(metricsLabelNames)
- .register(functionCollectorRegistry);
+ .register(collectorRegistry);
statTotalProcessedSuccessfully = Counter.build()
- .name("__function_total_successfully_processed__")
+ .name(PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL)
.help("Total number of messages processed successfully.")
.labelNames(metricsLabelNames)
- .register(functionCollectorRegistry);
+ .register(collectorRegistry);
statTotalSysExceptions = Counter.build()
- .name("__function_total_system_exceptions__")
+ .name(PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL)
.help("Total number of system exceptions.")
.labelNames(metricsLabelNames)
- .register(functionCollectorRegistry);
+ .register(collectorRegistry);
statTotalUserExceptions = Counter.build()
- .name("__function_total_user_exceptions__")
+ .name(PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL)
.help("Total number of user exceptions.")
.labelNames(metricsLabelNames)
- .register(functionCollectorRegistry);
+ .register(collectorRegistry);
statProcessLatency = Summary.build()
- .name("__function_process_latency_ms__").help("Process latency
in milliseconds.")
+ .name(PULSAR_FUNCTION_PROCESS_LATENCY_MS)
+ .help("Process latency in milliseconds.")
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.labelNames(metricsLabelNames)
- .register(functionCollectorRegistry);
+ .register(collectorRegistry);
+
+ statlastInvocation = Gauge.build()
+ .name(PULSAR_FUNCTION_LAST_INVOCATION)
+ .help("The timestamp of the last invocation of the function")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+
+ statTotalRecordsRecieved = Counter.build()
+ .name(PULSAR_FUNCTION_RECEIVED_TOTAL)
+ .help("Total number of messages received from source.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
}
public void addUserException(Exception ex) {
@@ -120,8 +143,9 @@ public class FunctionStats {
statTotalSysExceptions.clear();
statTotalUserExceptions.clear();
statProcessLatency.clear();
+ statlastInvocation.clear();
+ statTotalRecordsRecieved.clear();
latestUserExceptions.clear();
latestSystemExceptions.clear();
- lastInvocationTime = 0;
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 040af91..51ec0d4 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -41,6 +41,7 @@ public class InstanceConfig {
private FunctionDetails functionDetails;
private int maxBufferedTuples;
private int port;
+ private String clusterName;
/**
* Get the string representation of {@link #getInstanceId()}.
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 9e5ae51..ab900ac 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.functions.instance;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
+import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Summary;
import lombok.AccessLevel;
import lombok.Getter;
@@ -45,6 +46,8 @@ import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.functions.ConsumerConfig;
+import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
@@ -58,8 +61,6 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.common.functions.ConsumerConfig;
-import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.StateUtils;
@@ -116,6 +117,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
private Sink sink;
private final SecretsProvider secretsProvider;
+
+ private CollectorRegistry collectorRegistry;
private final String[] metricsLabels;
public JavaInstanceRunnable(InstanceConfig instanceConfig,
@@ -123,19 +126,25 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
String jarFile,
PulsarClient pulsarClient,
String stateStorageServiceUrl,
- SecretsProvider secretsProvider) {
+ SecretsProvider secretsProvider,
+ CollectorRegistry collectorRegistry) {
this.instanceConfig = instanceConfig;
this.fnCache = fnCache;
this.jarFile = jarFile;
this.client = (PulsarClientImpl) pulsarClient;
this.stateStorageServiceUrl = stateStorageServiceUrl;
- this.stats = new FunctionStats();
+ this.stats = new FunctionStats(collectorRegistry);
this.secretsProvider = secretsProvider;
+ this.collectorRegistry = collectorRegistry;
this.metricsLabels = new String[]{
instanceConfig.getFunctionDetails().getTenant(),
- instanceConfig.getFunctionDetails().getNamespace(),
- instanceConfig.getFunctionDetails().getName(),
- String.valueOf(instanceConfig.getInstanceId())
+ String.format("%s/%s",
instanceConfig.getFunctionDetails().getTenant(),
+ instanceConfig.getFunctionDetails().getNamespace()),
+ String.format("%s/%s/%s",
instanceConfig.getFunctionDetails().getTenant(),
+ instanceConfig.getFunctionDetails().getNamespace(),
+ instanceConfig.getFunctionDetails().getName()),
+ String.valueOf(instanceConfig.getInstanceId()),
+ instanceConfig.getClusterName()
};
}
@@ -181,7 +190,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
}
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
- return new ContextImpl(instanceConfig, instanceLog, client,
inputTopics, secretsProvider);
+ return new ContextImpl(instanceConfig, instanceLog, client,
inputTopics, secretsProvider, collectorRegistry, metricsLabels);
}
/**
@@ -201,6 +210,9 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
while (true) {
currentRecord = readInput();
+ // increment number of records received from source
+ stats.statTotalRecordsRecieved.labels(metricsLabels).inc();
+
if
(instanceConfig.getFunctionDetails().getProcessingGuarantees() ==
org.apache.pulsar.functions
.proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
if (instanceConfig.getFunctionDetails().getAutoAck()) {
@@ -212,7 +224,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
JavaExecutionResult result;
// set last invocation time
- stats.setLastInvocationTime(System.currentTimeMillis());
+
stats.statlastInvocation.labels(metricsLabels).set(System.currentTimeMillis());
// start time for process latency stat
Summary.Timer requestTimer =
stats.statProcessLatency.labels(metricsLabels).startTimer();
@@ -331,7 +343,6 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
stats.addUserException(result.getUserException() );
srcRecord.fail();
} else {
- stats.statTotalProcessedSuccessfully.labels(metricsLabels).inc();
if (result.getResult() != null) {
sendOutputMessage(srcRecord, result.getResult());
} else {
@@ -340,6 +351,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
srcRecord.ack();
}
}
+ // increment total successfully processed
+ stats.statTotalProcessedSuccessfully.labels(metricsLabels).inc();
}
}
@@ -439,22 +452,24 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
private Builder createMetricsDataBuilder() {
InstanceCommunication.MetricsData.Builder bldr =
InstanceCommunication.MetricsData.newBuilder();
- addSystemMetrics("__total_processed__",
stats.statTotalProcessed.labels(metricsLabels).get(), bldr);
- addSystemMetrics("__total_successfully_processed__",
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr);
- addSystemMetrics("__total_system_exceptions__",
stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr);
- addSystemMetrics("__total_user_exceptions__",
stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr);
- addSystemMetrics("__avg_latency_ms__",
+ addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_TOTAL,
stats.statTotalProcessed.labels(metricsLabels).get(), bldr);
+
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL,
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr);
+
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL,
stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr);
+ addSystemMetrics(FunctionStats.PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL,
stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr);
+ addSystemMetrics(FunctionStats.PULSAR_FUNCTION_RECEIVED_TOTAL,
stats.statTotalRecordsRecieved.labels(metricsLabels).get(), bldr);
+ addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESS_LATENCY_MS,
stats.statProcessLatency.labels(metricsLabels).get().count <=
0.0
? 0 :
stats.statProcessLatency.labels(metricsLabels).get().sum /
stats.statProcessLatency.labels(metricsLabels).get().count,
bldr);
+ addSystemMetrics(FunctionStats.PULSAR_FUNCTION_LAST_INVOCATION,
stats.statlastInvocation.labels(metricsLabels).get(), bldr);
return bldr;
}
public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder =
InstanceCommunication.FunctionStatus.newBuilder();
-
functionStatusBuilder.setNumProcessed((long)stats.statTotalProcessed.labels(metricsLabels).get());
-
functionStatusBuilder.setNumSuccessfullyProcessed((long)stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
-
functionStatusBuilder.setNumUserExceptions((long)stats.statTotalUserExceptions.labels(metricsLabels).get());
+ functionStatusBuilder.setNumProcessed((long)
stats.statTotalProcessed.labels(metricsLabels).get());
+ functionStatusBuilder.setNumSuccessfullyProcessed((long)
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
+ functionStatusBuilder.setNumUserExceptions((long)
stats.statTotalUserExceptions.labels(metricsLabels).get());
stats.getLatestUserExceptions().forEach(ex -> {
functionStatusBuilder.addLatestUserExceptions(ex);
});
@@ -464,8 +479,9 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
});
functionStatusBuilder.setAverageLatency(
stats.statProcessLatency.labels(metricsLabels).get().count ==
0.0
- ? 0 :
stats.statProcessLatency.labels(metricsLabels).get().sum /
stats.statProcessLatency.labels(metricsLabels).get().count);
-
functionStatusBuilder.setLastInvocationTime(stats.getLastInvocationTime());
+ ? 0 :
stats.statProcessLatency.labels(metricsLabels).get().sum /
stats.statProcessLatency
+ .labels(metricsLabels).get().count);
+ functionStatusBuilder.setLastInvocationTime((long)
stats.statlastInvocation.labels(metricsLabels).get());
return functionStatusBuilder;
}
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py
b/pulsar-functions/instance/src/main/python/contextimpl.py
index 83a63aa..9a37d59 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -31,6 +31,9 @@ import pulsar
import util
import InstanceCommunication_pb2
+from prometheus_client import Summary
+from function_stats import Stats
+
# For keeping track of accumulated metrics
class AccumulatedMetricDatum(object):
def __init__(self):
@@ -39,7 +42,7 @@ class AccumulatedMetricDatum(object):
self.max = float('-inf')
self.min = float('inf')
- def record(self, value):
+ def update(self, value):
self.count += 1
self.sum += value
if value > self.max:
@@ -48,14 +51,17 @@ class AccumulatedMetricDatum(object):
self.min = value
class ContextImpl(pulsar.Context):
- def __init__(self, instance_config, logger, pulsar_client, user_code,
consumers, secrets_provider):
+
+ # add label to indicate user metric
+ user_metrics_label_names = Stats.metrics_label_names + ["metric"]
+
+ def __init__(self, instance_config, logger, pulsar_client, user_code,
consumers, secrets_provider, metrics_labels):
self.instance_config = instance_config
self.log = logger
self.pulsar_client = pulsar_client
self.user_code_dir = os.path.dirname(user_code)
self.consumers = consumers
self.secrets_provider = secrets_provider
- self.current_accumulated_metrics = {}
self.accumulated_metrics = {}
self.publish_producers = {}
self.publish_serializers = {}
@@ -69,6 +75,12 @@ class ContextImpl(pulsar.Context):
if instance_config.function_details.secretsMap \
else {}
+ self.metrics_labels = metrics_labels
+ self.user_metrics_labels = dict()
+ self.user_metrics_summary = Summary("pulsar_function_user_metric",
+ 'Pulsar Function user defined metric',
+ ContextImpl.user_metrics_label_names)
+
# Called on a per message basis to set the context for the current message
def set_current_message_context(self, msgid, topic):
self.current_message_id = msgid
@@ -117,9 +129,12 @@ class ContextImpl(pulsar.Context):
return self.secrets_provider.provide_secret(secret_key,
self.secrets_map[secret_key])
def record_metric(self, metric_name, metric_value):
- if not metric_name in self.current_accumulated_metrics:
- self.current_accumulated_metrics[metric_name] = AccumulatedMetricDatum()
- self.current_accumulated_metrics[metric_name].update(metric_value)
+ if metric_name not in self.user_metrics_labels:
+ self.user_metrics_labels[metric_name] = self.metrics_labels +
[metric_name]
+
self.user_metrics_summary.labels(*self.user_metrics_labels[metric_name]).observe(metric_value)
+ if not metric_name in self.accumulated_metrics:
+ self.accumulated_metrics[metric_name] = AccumulatedMetricDatum()
+ self.accumulated_metrics[metric_name].update(metric_value)
def get_output_topic(self):
return self.instance_config.function_details.output
@@ -164,17 +179,16 @@ class ContextImpl(pulsar.Context):
def reset_metrics(self):
# TODO: Make it thread safe
+ for labels in self.user_metrics_labels.values():
+ self.user_metrics_summary.labels(*labels)._sum.set(0.0)
+ self.user_metrics_summary.labels(*labels)._count.set(0.0)
self.accumulated_metrics.clear()
- self.accumulated_metrics.update(self.current_accumulated_metrics)
- self.current_accumulated_metrics.clear()
def get_metrics(self):
metrics = InstanceCommunication_pb2.MetricsData()
for metric_name, accumulated_metric in self.accumulated_metrics.items():
- m = InstanceCommunication_pb2.MetricsData.DataDigest()
- m.count = accumulated_metric.count
- m.sum = accumulated_metric.sum
- m.max = accumulated_metric.max
- m.min = accumulated_metric.min
- metrics.metrics[metric_name] = m
+ metrics.metrics[metric_name].count = accumulated_metric.count
+ metrics.metrics[metric_name].sum = accumulated_metric.sum
+ metrics.metrics[metric_name].max = accumulated_metric.max
+ metrics.metrics[metric_name].min = accumulated_metric.min
return metrics
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py
b/pulsar-functions/instance/src/main/python/function_stats.py
new file mode 100644
index 0000000..13b3f84
--- /dev/null
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import traceback
+import time
+
+from prometheus_client import Counter, Summary, Gauge
+
+# We keep track of the following metrics
+class Stats(object):
+ metrics_label_names = ['tenant', 'namespace', 'function', 'instance_id',
'cluster']
+
+ TOTAL_PROCESSED = 'pulsar_function_processed_total'
+ TOTAL_SUCCESSFULLY_PROCESSED = 'pulsar_function_processed_successfully_total'
+ TOTAL_SYSTEM_EXCEPTIONS = 'pulsar_function_system_exceptions_total'
+ TOTAL_USER_EXCEPTIONS = 'pulsar_function_user_exceptions_total'
+ PROCESS_LATENCY_MS = 'pulsar_function_process_latency_ms'
+ LAST_INVOCATION = 'pulsar_function_last_invocation'
+ TOTAL_RECEIVED = 'pulsar_function_received_total'
+
+ # Declare Prometheus
+ stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages
processed.', metrics_label_names)
+ stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED,
+ 'Total number of messages
processed successfully.', metrics_label_names)
+ stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number
of system exceptions.',
+ metrics_label_names)
+ stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of
user exceptions.',
+ metrics_label_names)
+
+ stat_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in
milliseconds.', metrics_label_names)
+
+ stat_last_invocation = Gauge(LAST_INVOCATION, 'The timestamp of the last
invocation of the function.', metrics_label_names)
+
+ stat_total_received = Counter(TOTAL_RECEIVED, 'Total number of messages
received from source.', metrics_label_names)
+
+ latest_user_exception = []
+ latest_sys_exception = []
+
+ def add_user_exception(self):
+ self.latest_sys_exception.append((traceback.format_exc(), int(time.time()
* 1000)))
+ if len(self.latest_sys_exception) > 10:
+ self.latest_sys_exception.pop(0)
+
+ def add_sys_exception(self):
+ self.latest_sys_exception.append((traceback.format_exc(), int(time.time()
* 1000)))
+ if len(self.latest_sys_exception) > 10:
+ self.latest_sys_exception.pop(0)
+
+ def reset(self, metrics_labels):
+ self.latest_user_exception = []
+ self.latest_sys_exception = []
+ self.stat_total_processed.labels(*metrics_labels)._value.set(0.0)
+
self.stat_total_processed_successfully.labels(*metrics_labels)._value.set(0.0)
+ self.stat_total_user_exceptions.labels(*metrics_labels)._value.set(0.0)
+ self.stat_total_sys_exceptions.labels(*metrics_labels)._value.set(0.0)
+ self.stat_process_latency_ms.labels(*metrics_labels)._sum.set(0.0)
+ self.stat_process_latency_ms.labels(*metrics_labels)._count.set(0.0)
+ self.stat_last_invocation.labels(*metrics_labels).set(0.0)
+ self.stat_total_received.labels(*metrics_labels)._value.set(0.0)
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py
b/pulsar-functions/instance/src/main/python/python_instance.py
index 66d222c..1d56a39 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -31,14 +31,8 @@ try:
except:
import queue
import threading
-from functools import partial
-from collections import namedtuple
-from threading import Timer
-from prometheus_client import Counter, Summary
-import traceback
import sys
import re
-
import pulsar
import contextimpl
import Function_pb2
@@ -46,6 +40,11 @@ import log
import util
import InstanceCommunication_pb2
+from functools import partial
+from collections import namedtuple
+from threading import Timer
+from function_stats import Stats
+
Log = log.Log
# Equivalent of the InstanceConfig in Java
InstanceConfig = namedtuple('InstanceConfig', 'instance_id function_id
function_version function_details max_buffered_tuples')
@@ -68,55 +67,9 @@ def base64ify(bytes_or_str):
else:
return output_bytes
-# We keep track of the following metrics
-class Stats(object):
- metrics_label_names = ['tenant', 'namespace', 'name', 'instance_id']
-
- TOTAL_PROCESSED = '__function_total_processed__'
- TOTAL_SUCCESSFULLY_PROCESSED = '__function_total_successfully_processed__'
- TOTAL_SYSTEM_EXCEPTIONS = '__function_total_system_exceptions__'
- TOTAL_USER_EXCEPTIONS = '__function_total_user_exceptions__'
- PROCESS_LATENCY_MS = '__function_process_latency_ms__'
-
- # Declare Prometheus
- stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages
processed.', metrics_label_names)
- stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED,
- 'Total number of messages
processed successfully.', metrics_label_names)
- stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number
of system exceptions.',
- metrics_label_names)
- stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of
user exceptions.',
- metrics_label_names)
-
- stats_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in
milliseconds.', metrics_label_names)
-
- latest_user_exception = []
- latest_sys_exception = []
-
- last_invocation_time = 0.0
-
- def add_user_exception(self):
- self.latest_sys_exception.append((traceback.format_exc(), int(time.time()
* 1000)))
- if len(self.latest_sys_exception) > 10:
- self.latest_sys_exception.pop(0)
-
- def add_sys_exception(self):
- self.latest_sys_exception.append((traceback.format_exc(), int(time.time()
* 1000)))
- if len(self.latest_sys_exception) > 10:
- self.latest_sys_exception.pop(0)
-
- def reset(self, metrics_labels):
- self.latest_user_exception = []
- self.latest_sys_exception = []
- self.stat_total_processed.labels(*metrics_labels)._value.set(0.0)
-
self.stat_total_processed_successfully.labels(*metrics_labels)._value.set(0.0)
- self.stat_total_user_exceptions.labels(*metrics_labels)._value.set(0.0)
- self.stat_total_sys_exceptions.labels(*metrics_labels)._value.set(0.0)
- self.stats_process_latency_ms.labels(*metrics_labels)._sum.set(0)
- self.stats_process_latency_ms.labels(*metrics_labels)._count.set(0);
- self.last_invocation_time = 0.0
-
class PythonInstance(object):
- def __init__(self, instance_id, function_id, function_version,
function_details, max_buffered_tuples, expected_healthcheck_interval,
user_code, pulsar_client, secrets_provider):
+ def __init__(self, instance_id, function_id, function_version,
function_details, max_buffered_tuples,
+ expected_healthcheck_interval, user_code, pulsar_client,
secrets_provider, cluster_name):
self.instance_config = InstanceConfig(instance_id, function_id,
function_version, function_details, max_buffered_tuples)
self.user_code = user_code
self.queue = queue.Queue(max_buffered_tuples)
@@ -140,7 +93,10 @@ class PythonInstance(object):
self.timeout_ms = function_details.source.timeoutMs if
function_details.source.timeoutMs > 0 else None
self.expected_healthcheck_interval = expected_healthcheck_interval
self.secrets_provider = secrets_provider
- self.metrics_labels = [function_details.tenant,
function_details.namespace, function_details.name, instance_id]
+ self.metrics_labels = [function_details.tenant,
+ "%s/%s" % (function_details.tenant,
function_details.namespace),
+ "%s/%s/%s" % (function_details.tenant,
function_details.namespace, function_details.name),
+ instance_id, cluster_name]
def health_check(self):
self.last_health_check_ts = time.time()
@@ -210,7 +166,9 @@ class PythonInstance(object):
except:
self.function_purefunction = function_kclass
- self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log,
self.pulsar_client, self.user_code, self.consumers, self.secrets_provider)
+ self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log,
self.pulsar_client,
+ self.user_code, self.consumers,
+ self.secrets_provider,
self.metrics_labels)
# Now launch a thread that does execution
self.execution_thread = threading.Thread(target=self.actual_execution)
self.execution_thread.start()
@@ -242,13 +200,13 @@ class PythonInstance(object):
try:
# get user function start time for statistic calculation
start_time = time.time()
- self.stats.last_invocation_time = start_time * 1000.0
+
Stats.stat_last_invocation.labels(*self.metrics_labels).set(start_time * 1000.0)
if self.function_class is not None:
output_object = self.function_class.process(input_object,
self.contextimpl)
else:
output_object = self.function_purefunction.process(input_object)
successfully_executed = True
-
Stats.stats_process_latency_ms.labels(*self.metrics_labels).observe((time.time()
- start_time) * 1000.0)
+
Stats.stat_process_latency_ms.labels(*self.metrics_labels).observe((time.time()
- start_time) * 1000.0)
Stats.stat_total_processed.labels(*self.metrics_labels).inc()
except Exception as e:
Log.exception("Exception while executing user method")
@@ -310,6 +268,8 @@ class PythonInstance(object):
max_pending_messages=100000)
def message_listener(self, serde, consumer, message):
+ # increment number of received records from source
+ Stats.stat_total_received.labels(*self.metrics_labels).inc()
item = InternalMessage(message, consumer.topic(), serde, consumer)
self.queue.put(item, True)
if self.atmost_once and self.auto_ack:
@@ -329,14 +289,16 @@ class PythonInstance(object):
# First get any user metrics
metrics = self.contextimpl.get_metrics()
# Now add system metrics as well
- self.add_system_metrics("__total_processed__",
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics)
- self.add_system_metrics("__total_successfully_processed__",
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(),
metrics)
- self.add_system_metrics("__total_system_exceptions__",
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get(),
metrics)
- self.add_system_metrics("__total_user_exceptions__",
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get(),
metrics)
- self.add_system_metrics("__avg_latency_ms__",
- 0.0 if
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0
- else
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() /
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get(),
+ self.add_system_metrics(Stats.TOTAL_PROCESSED,
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics)
+ self.add_system_metrics(Stats.TOTAL_SUCCESSFULLY_PROCESSED,
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(),
metrics)
+ self.add_system_metrics(Stats.TOTAL_SYSTEM_EXCEPTIONS,
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get(),
metrics)
+ self.add_system_metrics(Stats.TOTAL_USER_EXCEPTIONS,
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get(),
metrics)
+ self.add_system_metrics(Stats.PROCESS_LATENCY_MS,
+ 0.0 if
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0
+ else
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get() /
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get(),
metrics)
+ self.add_system_metrics(Stats.TOTAL_RECEIVED,
Stats.stat_total_received.labels(*self.metrics_labels)._value.get(), metrics)
+ self.add_system_metrics(Stats.LAST_INVOCATION,
Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get(), metrics)
return metrics
def add_system_metrics(self, metric_name, value, metrics):
@@ -348,23 +310,32 @@ class PythonInstance(object):
def get_function_status(self):
status = InstanceCommunication_pb2.FunctionStatus()
status.running = True
- status.numProcessed =
long(Stats.stat_total_processed.labels(*self.metrics_labels)._value.get())
- status.numSuccessfullyProcessed =
long(Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get())
- status.numUserExceptions =
long(Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get())
+
+ total_processed =
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get()
+ stat_total_processed_successfully =
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get()
+ stat_total_user_exceptions =
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get()
+ stat_total_sys_exceptions =
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get()
+ stat_process_latency_ms_count =
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
+ stat_process_latency_ms_sum =
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
+ stat_last_invocation =
Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get()
+
+ status.numProcessed = int(total_processed) if sys.version_info.major >= 3
else long(total_processed)
+ status.numSuccessfullyProcessed = int(stat_total_processed_successfully)
if sys.version_info.major >= 3 else long(stat_total_processed_successfully)
+ status.numUserExceptions = int(stat_total_user_exceptions) if
sys.version_info.major >= 3 else long(stat_total_user_exceptions)
status.instanceId = self.instance_config.instance_id
for ex, tm in self.stats.latest_user_exception:
to_add = status.latestUserExceptions.add()
to_add.exceptionString = ex
to_add.msSinceEpoch = tm
- status.numSystemExceptions =
long(Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get())
+ status.numSystemExceptions = int(stat_total_sys_exceptions) if
sys.version_info.major >= 3 else long(stat_total_sys_exceptions)
for ex, tm in self.stats.latest_sys_exception:
to_add = status.latestSystemExceptions.add()
to_add.exceptionString = ex
to_add.msSinceEpoch = tm
status.averageLatency = 0.0 \
- if
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0
\
- else
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() /
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get()
- status.lastInvocationTime = long(self.stats.last_invocation_time)
+ if stat_process_latency_ms_count <= 0.0 \
+ else stat_process_latency_ms_sum / stat_process_latency_ms_count
+ status.lastInvocationTime = int(stat_last_invocation) if
sys.version_info.major >= 3 else long(stat_last_invocation)
return status
def join(self):
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index d256269..2a02380 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -39,6 +39,8 @@ import log
import server
import python_instance
import util
+import prometheus_client
+
from google.protobuf import json_format
to_run = True
@@ -69,6 +71,7 @@ def main():
parser.add_argument('--hostname_verification_enabled', required=False,
help='Enable hostname verification')
parser.add_argument('--tls_trust_cert_path', required=False, help='Tls trust
cert file path')
parser.add_argument('--port', required=True, help='Instance Port', type=int)
+ parser.add_argument('--metrics_port', required=True, help="Port metrics will
be exposed on", type=int)
parser.add_argument('--max_buffered_tuples', required=True, help='Maximum
number of Buffered tuples')
parser.add_argument('--logging_directory', required=True, help='Logging
Directory')
parser.add_argument('--logging_file', required=True, help='Log file name')
@@ -79,6 +82,7 @@ def main():
parser.add_argument('--install_usercode_dependencies', required=False,
help='For packaged python like wheel files, do we need to install all
dependencies', type=bool)
parser.add_argument('--dependency_repository', required=False, help='For
packaged python like wheel files, which repository to pull the dependencies
from')
parser.add_argument('--extra_dependency_repository', required=False,
help='For packaged python like wheel files, any extra repository to pull the
dependencies from')
+ parser.add_argument('--cluster_name', required=True, help='The name of the
cluster this instance is running on')
args = parser.parse_args()
function_details = Function_pb2.FunctionDetails()
@@ -166,10 +170,12 @@ def main():
str(args.function_version),
function_details,
int(args.max_buffered_tuples),
int(args.expected_healthcheck_interval),
- str(args.py), pulsar_client,
secrets_provider)
+ str(args.py), pulsar_client,
secrets_provider, args.cluster_name)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)
+ prometheus_client.start_http_server(args.metrics_port)
+
global to_run
while to_run:
time.sleep(1)
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index e3e32fd..0ac3502 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import io.prometheus.client.CollectorRegistry;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
@@ -75,7 +76,7 @@ public class ContextImplTest {
logger,
client,
new ArrayList<>(),
- new EnvironmentBasedSecretsProvider()
+ new EnvironmentBasedSecretsProvider(), new CollectorRegistry(),
new String[0]
);
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 80b3b1d..b927c49 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -56,7 +56,7 @@ public class JavaInstanceRunnableTest {
private JavaInstanceRunnable createRunnable(boolean addCustom, String
outputSerde) throws Exception {
InstanceConfig config = createInstanceConfig(addCustom, outputSerde);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- config, null, null, null, null, null);
+ config, null, null, null, null, null, null);
return javaInstanceRunnable;
}
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py
b/pulsar-functions/instance/src/test/python/test_python_instance.py
index 748e5d8..b865a9d 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_python_instance.py
@@ -48,7 +48,7 @@ class TestContextImpl(unittest.TestCase):
pulsar_client.create_producer = Mock(return_value=producer)
user_code=__file__
consumers = None
- context_impl = ContextImpl(instance_config, logger, pulsar_client,
user_code, consumers, None)
+ context_impl = ContextImpl(instance_config, logger, pulsar_client,
user_code, consumers, None, None)
context_impl.publish("test_topic_name", "test_message")
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 98986dd..d5fb80e 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -29,8 +29,17 @@ import com.google.protobuf.util.JsonFormat;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.HTTPServer;
+import io.prometheus.client.hotspot.BufferPoolsExports;
+import io.prometheus.client.hotspot.ClassLoadingExports;
import io.prometheus.client.hotspot.DefaultExports;
+import io.prometheus.client.hotspot.GarbageCollectorExports;
+import io.prometheus.client.hotspot.MemoryPoolsExports;
+import io.prometheus.client.hotspot.StandardExports;
+import io.prometheus.client.hotspot.ThreadExports;
+import io.prometheus.client.hotspot.VersionInfoExports;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -43,12 +52,14 @@ import
org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.Reflections;
import java.lang.reflect.Type;
+import java.net.InetSocketAddress;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
/**
* A function container implemented using java thread.
@@ -99,6 +110,9 @@ public class JavaInstanceMain implements AutoCloseable {
@Parameter(names = "--port", description = "Port to listen on\n", required
= true)
protected int port;
+ @Parameter(names = "--metrics_port", description = "Port metrics will be
exposed on\n", required = true)
+ protected int metrics_port;
+
@Parameter(names = "--max_buffered_tuples", description = "Maximum number
of tuples to buffer\n", required = true)
protected int maxBufferedTuples;
@@ -111,11 +125,15 @@ public class JavaInstanceMain implements AutoCloseable {
@Parameter(names = "--secrets_provider_config", description = "The config
that needs to be passed to secrets provider", required = false)
protected String secretsProviderConfig;
+ @Parameter(names = "--cluster_name", description = "The name of the
cluster this instance is running on", required = true)
+ protected String clusterName;
+
private Server server;
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
private Long lastHealthCheckTs = null;
private ScheduledExecutorService timer;
+ private HTTPServer metricsServer;
public JavaInstanceMain() { }
@@ -126,6 +144,7 @@ public class JavaInstanceMain implements AutoCloseable {
instanceConfig.setFunctionVersion(functionVersion);
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
+ instanceConfig.setClusterName(clusterName);
FunctionDetails.Builder functionDetailsBuilder =
FunctionDetails.newBuilder();
if (functionDetailsJsonString.charAt(0) == '\'') {
functionDetailsJsonString = functionDetailsJsonString.substring(1);
@@ -162,6 +181,9 @@ public class JavaInstanceMain implements AutoCloseable {
}
secretsProvider.init(secretsProviderConfigMap);
+ // Collector Registry for prometheus metrics
+ CollectorRegistry collectorRegistry = new CollectorRegistry();
+
containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
pulsarServiceUrl,
stateStorageServiceUrl,
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin)
@@ -169,7 +191,7 @@ public class JavaInstanceMain implements AutoCloseable {
.tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
.tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
- secretsProvider);
+ secretsProvider, collectorRegistry);
runtimeSpawner = new RuntimeSpawner(
instanceConfig,
jarFile,
@@ -195,12 +217,13 @@ public class JavaInstanceMain implements AutoCloseable {
}
});
- // registering jvm metrics to prometheus
- DefaultExports.initialize();
-
log.info("Starting runtimeSpawner");
runtimeSpawner.start();
+ // starting metrics server
+ log.info("Starting metrics server on port {}", metrics_port);
+ metricsServer = new HTTPServer(new InetSocketAddress(metrics_port),
collectorRegistry, true);
+
if (expectedHealthCheckInterval > 0) {
timer = Executors.newSingleThreadScheduledExecutor();
timer.scheduleAtFixedRate(new TimerTask() {
@@ -253,6 +276,9 @@ public class JavaInstanceMain implements AutoCloseable {
if (containerFactory != null) {
containerFactory.close();
}
+ if (metricsServer != null) {
+ metricsServer.stop();
+ }
} catch (Exception ex) {
System.err.println(ex);
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 6e716f2..4dc97ac 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -88,9 +88,7 @@ class KubernetesRuntime implements Runtime {
private static final String ENV_SHARD_ID = "SHARD_ID";
private static final int maxJobNameSize = 55;
private static final Integer GRPC_PORT = 9093;
- private static final Integer PROMETHEUS_PORT = 9094;
- private static final Double prometheusMetricsServerCpu = 0.1;
- private static final Long prometheusMetricsServerRam = 125000000l;
+ private static final Integer METRICS_PORT = 9094;
public static final Pattern VALID_POD_NAME_REGEX =
Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
Pattern.CASE_INSENSITIVE);
@@ -110,7 +108,6 @@ class KubernetesRuntime implements Runtime {
// The thread that invokes the function
@Getter
private List<String> processArgs;
- private List<String> prometheusMetricsServerArgs;
@Getter
private ManagedChannel[] channel;
private InstanceControlGrpc.InstanceControlFutureStub[] stub;
@@ -190,8 +187,8 @@ class KubernetesRuntime implements Runtime {
secretsProviderConfig,
installUserCodeDependencies,
pythonDependencyRepository,
- pythonExtraDependencyRepository);
- this.prometheusMetricsServerArgs =
composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile,
expectedMetricsInterval);
+ pythonExtraDependencyRepository,
+ METRICS_PORT);
running = false;
doChecks(instanceConfig.getFunctionDetails());
}
@@ -453,14 +450,6 @@ class KubernetesRuntime implements Runtime {
);
}
- protected List<String> getPrometheusMetricsServerCommand() {
- return Arrays.asList(
- "sh",
- "-c",
- String.join(" ", prometheusMetricsServerArgs)
- );
- }
-
private List<String> getDownloadCommand(String bkPath, String
userCodeFilePath) {
return Arrays.asList(
pulsarRootDir + "/bin/pulsar-admin",
@@ -525,15 +514,16 @@ class KubernetesRuntime implements Runtime {
private Map<String, String> getPrometheusAnnotations() {
final Map<String, String> annotations = new HashMap<>();
annotations.put("prometheus.io/scrape", "true");
- annotations.put("prometheus.io/port", String.valueOf(PROMETHEUS_PORT));
+ annotations.put("prometheus.io/port", String.valueOf(METRICS_PORT));
return annotations;
}
private Map<String, String> getLabels(Function.FunctionDetails
functionDetails) {
final Map<String, String> labels = new HashMap<>();
- labels.put("app", createJobName(functionDetails));
- labels.put("namespace", functionDetails.getNamespace());
+ labels.put("namespace",
String.format("%s/%s",functionDetails.getTenant(),
functionDetails.getNamespace()));
labels.put("tenant", functionDetails.getTenant());
+ labels.put("function", String.format("%s/%s/%s",
functionDetails.getTenant(),
+ functionDetails.getNamespace(), functionDetails.getName()));
if (customLabels != null && !customLabels.isEmpty()) {
labels.putAll(customLabels);
}
@@ -552,7 +542,6 @@ class KubernetesRuntime implements Runtime {
List<V1Container> containers = new LinkedList<>();
containers.add(getFunctionContainer(instanceCommand, resource));
- containers.add(getPrometheusContainer());
podSpec.containers(containers);
// Configure secrets
@@ -607,38 +596,6 @@ class KubernetesRuntime implements Runtime {
return container;
}
- private V1Container getPrometheusContainer() {
- final V1Container container = new
V1Container().name("prometheusmetricsserver");
-
- // set up the container images
- container.setImage(pulsarDockerImageName);
-
- // set up the container command
- container.setCommand(getPrometheusMetricsServerCommand());
-
- // setup the environment variables for the container
- final V1EnvVar envVarPodName = new V1EnvVar();
- envVarPodName.name("POD_NAME")
- .valueFrom(new V1EnvVarSource()
- .fieldRef(new V1ObjectFieldSelector()
- .fieldPath("metadata.name")));
- container.setEnv(Arrays.asList(envVarPodName));
-
-
- // set container resources
- final V1ResourceRequirements resourceRequirements = new
V1ResourceRequirements();
- final Map<String, Quantity> requests = new HashMap<>();
- requests.put("memory",
Quantity.fromString(Long.toString(prometheusMetricsServerRam)));
- requests.put("cpu",
Quantity.fromString(Double.toString(prometheusMetricsServerCpu)));
- resourceRequirements.setRequests(requests);
- container.setResources(resourceRequirements);
-
- // set container ports
- container.setPorts(getPrometheusContainerPorts());
-
- return container;
- }
-
private List<V1ContainerPort> getFunctionContainerPorts() {
List<V1ContainerPort> ports = new ArrayList<>();
final V1ContainerPort port = new V1ContainerPort();
@@ -652,7 +609,7 @@ class KubernetesRuntime implements Runtime {
List<V1ContainerPort> ports = new ArrayList<>();
final V1ContainerPort port = new V1ContainerPort();
port.setName("prometheus");
- port.setContainerPort(PROMETHEUS_PORT);
+ port.setContainerPort(METRICS_PORT);
ports.add(port);
return ports;
}
@@ -680,24 +637,4 @@ class KubernetesRuntime implements Runtime {
throw new RuntimeException("Kubernetes job name size should be
less than " + maxJobNameSize);
}
}
-
- private List<String> composePrometheusMetricsServerArgs(String
prometheusMetricsServerFile,
- Integer
expectedMetricsInterval) throws Exception {
- List<String> args = new LinkedList<>();
- args.add("java");
- args.add("-cp");
- args.add(prometheusMetricsServerFile);
-
args.add("-Dlog4j.configurationFile=prometheus_metricsserver_log4j2.yml");
- args.add("-Xmx" + String.valueOf(prometheusMetricsServerRam));
- args.add(PrometheusMetricsServer.class.getName());
- args.add("--function_details");
- args.add("'" +
JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails())
+ "'");
- args.add("--prometheus_port");
- args.add(String.valueOf(PROMETHEUS_PORT));
- args.add("--grpc_port");
- args.add(String.valueOf(GRPC_PORT));
- args.add("--collection_interval");
- args.add(String.valueOf(expectedMetricsInterval));
- return args;
- }
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index 958dbcf..c198cc3 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -193,6 +193,7 @@ public class LocalRunner {
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(Utils.findAvailablePort());
+ instanceConfig.setClusterName("local");
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
userCodeFile,
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index d06e90f..e39c7ad 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -36,6 +36,7 @@ import
org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Utils;
import java.io.InputStream;
import java.util.List;
@@ -117,7 +118,8 @@ class ProcessRuntime implements Runtime {
secretsProviderConfig,
false,
null,
- null);
+ null,
+ Utils.findAvailablePort());
}
/**
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index e84ec20..05f21e6 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -57,7 +57,8 @@ class RuntimeUtils {
String secretsProviderConfig,
Boolean
installUserCodeDepdendencies,
String pythonDependencyRepository,
- String
pythonExtraDependencyRepository) throws Exception {
+ String
pythonExtraDependencyRepository,
+ int metricsPort) throws Exception {
List<String> args = new LinkedList<>();
if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
args.add("java");
@@ -160,6 +161,9 @@ class RuntimeUtils {
args.add("--port");
args.add(String.valueOf(grpcPort));
+ args.add("--metrics_port");
+ args.add(String.valueOf(metricsPort));
+
// state storage configs
if (null != stateStorageServiceUrl
&& instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
@@ -177,6 +181,9 @@ class RuntimeUtils {
args.add("'" + secretsProviderConfig + "'");
}
}
+
+ args.add("--cluster_name");
+ args.add(instanceConfig.getClusterName());
return args;
}
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 5e42c52..913d4ae 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime;
import java.util.concurrent.CompletableFuture;
+import io.prometheus.client.CollectorRegistry;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClient;
@@ -53,18 +54,28 @@ class ThreadRuntime implements Runtime {
String jarFile,
PulsarClient pulsarClient,
String stateStorageServiceUrl,
- SecretsProvider secretsProvider) {
+ SecretsProvider secretsProvider,
+ CollectorRegistry collectorRegistry) {
this.instanceConfig = instanceConfig;
if (instanceConfig.getFunctionDetails().getRuntime() !=
Function.FunctionDetails.Runtime.JAVA) {
throw new RuntimeException("Thread Container only supports Java
Runtime");
}
+
+ // if collector registry is not set, create one for this thread.
+ // since each thread / instance will needs its own collector registry
for metrics collection
+ CollectorRegistry instanceCollectorRegistry = collectorRegistry;
+ if (instanceCollectorRegistry == null) {
+ instanceCollectorRegistry = new CollectorRegistry();
+ }
+
this.javaInstanceRunnable = new JavaInstanceRunnable(
instanceConfig,
fnCache,
jarFile,
pulsarClient,
stateStorageServiceUrl,
- secretsProvider);
+ secretsProvider,
+ instanceCollectorRegistry);
this.threadGroup = threadGroup;
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index 846028d..ed76117 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime;
import com.google.common.annotations.VisibleForTesting;
+import io.prometheus.client.CollectorRegistry;
import lombok.extern.slf4j.Slf4j;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -45,21 +46,23 @@ public class ThreadRuntimeFactory implements RuntimeFactory
{
private final PulsarClient pulsarClient;
private final String storageServiceUrl;
private final SecretsProvider secretsProvider;
+ private final CollectorRegistry collectorRegistry;
private volatile boolean closed;
public ThreadRuntimeFactory(String threadGroupName, String
pulsarServiceUrl, String storageServiceUrl,
- AuthenticationConfig authConfig,
SecretsProvider secretsProvider) throws Exception {
- this(threadGroupName, createPulsarClient(pulsarServiceUrl,
authConfig), storageServiceUrl, secretsProvider);
+ AuthenticationConfig authConfig,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry) throws
Exception {
+ this(threadGroupName, createPulsarClient(pulsarServiceUrl,
authConfig), storageServiceUrl, secretsProvider, collectorRegistry);
}
@VisibleForTesting
public ThreadRuntimeFactory(String threadGroupName, PulsarClient
pulsarClient, String storageServiceUrl,
- SecretsProvider secretsProvider) {
+ SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry) {
this.secretsProvider = secretsProvider;
this.fnCache = new FunctionCacheManagerImpl();
this.threadGroup = new ThreadGroup(threadGroupName);
this.pulsarClient = pulsarClient;
this.storageServiceUrl = storageServiceUrl;
+ this.collectorRegistry = collectorRegistry;
}
private static PulsarClient createPulsarClient(String pulsarServiceUrl,
AuthenticationConfig authConfig)
@@ -94,7 +97,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
jarFile,
pulsarClient,
storageServiceUrl,
- secretsProvider);
+ secretsProvider,
+ collectorRegistry);
}
@Override
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 2e00983..c22970e 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -192,6 +192,7 @@ public class KubernetesRuntimeTest {
config.setFunctionVersion("1.0");
config.setInstanceId(0);
config.setMaxBufferedTuples(1024);
+ config.setClusterName("standalone");
return config;
}
@@ -233,16 +234,19 @@ public class KubernetesRuntimeTest {
String classpath = javaInstanceJarFile;
String extraDepsEnv;
int portArg;
+ int metricsPortArg;
int totalArgs;
if (null != depsDir) {
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" +
depsDir;
classpath = classpath + ":" + depsDir + "/*";
- totalArgs = 29;
+ totalArgs = 33;
portArg = 24;
+ metricsPortArg = 26;
} else {
extraDepsEnv = "";
portArg = 23;
- totalArgs = 28;
+ metricsPortArg = 25;
+ totalArgs = 32;
}
if (secretsAttached) {
totalArgs += 4;
@@ -263,13 +267,15 @@ public class KubernetesRuntimeTest {
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" +
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+ + " --max_buffered_tuples 1024 --port " + args.get(portArg) +
" --metrics_port " + args.get(metricsPortArg)
+ " --state_storage_serviceurl " + stateStorageServiceUrl
+ " --expected_healthcheck_interval -1";
if (secretsAttached) {
expectedArgs += " --secrets_provider
org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
+ " --secrets_provider_config
'{\"Somevalue\":\"myvalue\"}'";
}
+ expectedArgs += " --cluster_name standalone";
+
assertEquals(String.join(" ", args), expectedArgs);
}
@@ -301,15 +307,18 @@ public class KubernetesRuntimeTest {
int portArg;
String pythonPath;
int configArg;
+ int metricsPortArg;
if (null == extraDepsDir) {
- totalArgs = 32;
+ totalArgs = 36;
portArg = 29;
configArg = 9;
pythonPath = "";
+ metricsPortArg = 31;
} else {
- totalArgs = 33;
+ totalArgs = 37;
portArg = 30;
configArg = 10;
+ metricsPortArg = 32;
pythonPath = "PYTHONPATH=${PYTHONPATH}:" + extraDepsDir + " ";
}
if (secretsAttached) {
@@ -331,12 +340,13 @@ public class KubernetesRuntimeTest {
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" +
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+ + " --max_buffered_tuples 1024 --port " + args.get(portArg) +
" --metrics_port " + args.get(metricsPortArg)
+ " --expected_healthcheck_interval -1";
if (secretsAttached) {
expectedArgs += " --secrets_provider
secretsprovider.ClearTextSecretsProvider"
+ " --secrets_provider_config
'{\"Somevalue\":\"myvalue\"}'";
}
+ expectedArgs += " --cluster_name standalone";
assertEquals(String.join(" ", args), expectedArgs);
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 63875fd..323943c 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -172,6 +172,7 @@ public class ProcessRuntimeTest {
config.setFunctionVersion("1.0");
config.setInstanceId(0);
config.setMaxBufferedTuples(1024);
+ config.setClusterName("standalone");
return config;
}
@@ -250,15 +251,18 @@ public class ProcessRuntimeTest {
String classpath = javaInstanceJarFile;
String extraDepsEnv;
int portArg;
+ int metricsPortArg;
if (null != depsDir) {
- assertEquals(args.size(), 33);
+ assertEquals(args.size(), 37);
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" +
depsDir.toString();
classpath = classpath + ":" + depsDir + "/*";
portArg = 24;
+ metricsPortArg = 26;
} else {
- assertEquals(args.size(), 32);
+ assertEquals(args.size(), 36);
extraDepsEnv = "";
portArg = 23;
+ metricsPortArg = 25;
}
String expectedArgs = "java -cp " + classpath
@@ -273,11 +277,12 @@ public class ProcessRuntimeTest {
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" +
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+ + " --max_buffered_tuples 1024 --port " + args.get(portArg) +
" --metrics_port " + args.get(metricsPortArg)
+ " --state_storage_serviceurl " + stateStorageServiceUrl
+ " --expected_healthcheck_interval 30"
+ " --secrets_provider
org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
- + " --secrets_provider_config '{\"Config\":\"Value\"}'";
+ + " --secrets_provider_config '{\"Config\":\"Value\"}'"
+ + " --cluster_name standalone";
assertEquals(String.join(" ", args), expectedArgs);
}
@@ -305,8 +310,9 @@ public class ProcessRuntimeTest {
ProcessRuntime container = factory.createContainer(config,
userJarFile, null, 30l);
List<String> args = container.getProcessArgs();
- int totalArgs = 30;
+ int totalArgs = 34;
int portArg = 23;
+ int metricsPortArg = 25;
String pythonPath = "";
int configArg = 9;
@@ -319,10 +325,11 @@ public class ProcessRuntimeTest {
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" +
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+ + " --max_buffered_tuples 1024 --port " + args.get(portArg) +
" --metrics_port " + args.get(metricsPortArg)
+ " --expected_healthcheck_interval 30"
+ " --secrets_provider
secretsprovider.ClearTextSecretsProvider"
- + " --secrets_provider_config '{\"Config\":\"Value\"}'";
+ + " --secrets_provider_config '{\"Config\":\"Value\"}'"
+ + " --cluster_name standalone";
assertEquals(String.join(" ", args), expectedArgs);
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 4f8d04a..46343a5 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -172,6 +172,7 @@ public class FunctionActioner implements AutoCloseable {
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
+
instanceConfig.setClusterName(workerConfig.getPulsarFunctionsCluster());
log.info("{}/{}/{}-{} start process with instance config {}",
functionDetails.getTenant(), functionDetails.getNamespace(),
functionDetails.getName(), instanceId, instanceConfig);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 25b97ce..8a8fafa 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -36,6 +36,7 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
+import io.prometheus.client.CollectorRegistry;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
@@ -130,7 +131,8 @@ public class FunctionRuntimeManager implements
AutoCloseable{
workerConfig.getPulsarServiceUrl(),
workerConfig.getStateStorageServiceUrl(),
authConfig,
- new ClearTextSecretsProvider());
+ new ClearTextSecretsProvider(),
+ null);
} else if (workerConfig.getProcessContainerFactory() != null) {
this.runtimeFactory = new ProcessRuntimeFactory(
workerConfig.getPulsarServiceUrl(),
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index e497ff3..d2f1504 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -73,13 +73,13 @@ public class FunctionsStatsGenerator {
int instanceId =
functionRuntimeInfo.getFunctionInstance().getInstanceId();
String qualifiedNamespace =
String.format("%s/%s", tenant, namespace);
- metric(out, cluster, qualifiedNamespace, name,
String.format("%scount", metricName),
+ metric(out, cluster, qualifiedNamespace, name,
String.format("%s_count", metricName),
instanceId, dataDigest.getCount());
- metric(out, cluster, qualifiedNamespace, name,
String.format("%smax", metricName),
+ metric(out, cluster, qualifiedNamespace, name,
String.format("%s_max", metricName),
instanceId, dataDigest.getMax());
- metric(out, cluster, qualifiedNamespace,name,
String.format("%smin", metricName),
+ metric(out, cluster, qualifiedNamespace,name,
String.format("%s_min", metricName),
instanceId, dataDigest.getMin());
- metric(out, cluster, qualifiedNamespace, name,
String.format("%ssum", metricName),
+ metric(out, cluster, qualifiedNamespace, name,
String.format("%s_sum", metricName),
instanceId, dataDigest.getSum());
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index e35aa2b..c957de7 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -87,10 +87,10 @@ public class FunctionStatsGeneratorTest {
CompletableFuture<InstanceCommunication.MetricsData>
metricsDataCompletableFuture = new CompletableFuture<>();
InstanceCommunication.MetricsData metricsData =
InstanceCommunication.MetricsData.newBuilder()
.putMetrics(
- "__function_total_processed__",
+ "pulsar_function_processed_total",
InstanceCommunication.MetricsData.DataDigest.newBuilder()
.setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build())
- .putMetrics("__function_process_latency_ms__",
+ .putMetrics("pulsar_function_process_latency_ms",
InstanceCommunication.MetricsData.DataDigest.newBuilder()
.setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build())
.build();
@@ -127,56 +127,56 @@ public class FunctionStatsGeneratorTest {
Assert.assertEquals(metrics.size(), 8);
System.out.println("metrics: " + metrics);
- Metric m = metrics.get("__function_total_processed__count");
+ Metric m = metrics.get("pulsar_function_processed_total_count");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 100.0);
- m = metrics.get("__function_total_processed__max");
+ m = metrics.get("pulsar_function_processed_total_max");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 200.0);
- m = metrics.get("__function_total_processed__sum");
+ m = metrics.get("pulsar_function_processed_total_sum");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 300.0);
- m = metrics.get("__function_total_processed__min");
+ m = metrics.get("pulsar_function_processed_total_min");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 0.0);
- m = metrics.get("__function_process_latency_ms__count");
+ m = metrics.get("pulsar_function_process_latency_ms_count");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 10.0);
- m = metrics.get("__function_process_latency_ms__max");
+ m = metrics.get("pulsar_function_process_latency_ms_max");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 20.0);
- m = metrics.get("__function_process_latency_ms__sum");
+ m = metrics.get("pulsar_function_process_latency_ms_sum");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 30.0);
- m = metrics.get("__function_process_latency_ms__min");
+ m = metrics.get("pulsar_function_process_latency_ms_min");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 066ca81..ffd1bd6 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.worker;
import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.util.concurrent.DefaultThreadFactory;
+import io.prometheus.client.CollectorRegistry;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
@@ -128,7 +129,7 @@ public class SchedulerManagerTest {
public void stop() {
this.executor.shutdown();
}
-
+
@Test
public void testSchedule() throws Exception {
@@ -141,7 +142,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -187,7 +188,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -234,7 +235,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -294,7 +295,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -359,7 +360,8 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider
+ (), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -470,7 +472,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -597,7 +599,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
Map<String, Map<String, Function.Assignment>> currentAssignments = new
HashMap<>();
@@ -651,7 +653,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -784,7 +786,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments