jerrypeng closed pull request #2930: Expose metrics via http port in function
instance
URL: https://github.com/apache/pulsar/pull/2930
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 406fe13db6..d13964df34 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,25 +18,12 @@
*/
package org.apache.pulsar.functions.instance;
-import static com.google.common.base.Preconditions.checkState;
-
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Summary;
import lombok.Getter;
import lombok.Setter;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -55,9 +42,24 @@
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkState;
+
/**
* This class implements the Context interface exposed to the user.
*/
+
class ContextImpl implements Context, SinkContext, SourceContext {
private InstanceConfig config;
private Logger logger;
@@ -92,7 +94,6 @@ public void update(double value) {
}
}
- private ConcurrentMap<String, AccumulatedMetricDatum>
currentAccumulatedMetrics;
private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics;
private Map<String, Producer<?>> publishProducers;
@@ -110,11 +111,21 @@ public void update(double value) {
private StateContextImpl stateContext;
private Map<String, Object> userConfigs;
+ Map<String, String[]> userMetricsLabels = new HashMap<>();
+ private final String[] metricsLabels;
+ private final Summary userMetricsSummary;
+
+ private final static String[] userMetricsLabelNames;
+ static {
+ // add label to indicate user metric
+ userMetricsLabelNames = Arrays.copyOf(FunctionStats.metricsLabelNames,
FunctionStats.metricsLabelNames.length + 1);
+ userMetricsLabelNames[FunctionStats.metricsLabelNames.length] =
"metric";
+ }
+
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient
client, List<String> inputTopics,
- SecretsProvider secretsProvider) {
+ SecretsProvider secretsProvider, CollectorRegistry
collectorRegistry, String[] metricsLabels) {
this.config = config;
this.logger = logger;
- this.currentAccumulatedMetrics = new ConcurrentHashMap<>();
this.accumulatedMetrics = new ConcurrentHashMap<>();
this.publishProducers = new HashMap<>();
this.inputTopics = inputTopics;
@@ -138,6 +149,17 @@ public ContextImpl(InstanceConfig config, Logger logger,
PulsarClient client, Li
} else {
secretsMap = new HashMap<>();
}
+
+ this.metricsLabels = metricsLabels;
+ this.userMetricsSummary = Summary.build()
+ .name("pulsar_function_user_metric")
+ .help("Pulsar Function user defined metric.")
+ .labelNames(userMetricsLabelNames)
+ .quantile(0.5, 0.01)
+ .quantile(0.9, 0.01)
+ .quantile(0.99, 0.01)
+ .quantile(0.999, 0.01)
+ .register(collectorRegistry);
}
public void setCurrentMessageContext(Record<?> record) {
@@ -320,8 +342,16 @@ public ByteBuffer getState(String key) {
@Override
public void recordMetric(String metricName, double value) {
- currentAccumulatedMetrics.putIfAbsent(metricName, new
AccumulatedMetricDatum());
- currentAccumulatedMetrics.get(metricName).update(value);
+ userMetricsLabels.computeIfAbsent(metricName,
+ s -> {
+ String[] userMetricLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 1);
+ userMetricLabels[userMetricLabels.length - 1] = metricName;
+ return userMetricLabels;
+ });
+
+
userMetricsSummary.labels(userMetricsLabels.get(metricName)).observe(value);
+ accumulatedMetrics.putIfAbsent(metricName, new
AccumulatedMetricDatum());
+ accumulatedMetrics.get(metricName).update(value);
}
public MetricsData getAndResetMetrics() {
@@ -331,9 +361,8 @@ public MetricsData getAndResetMetrics() {
}
public void resetMetrics() {
+ userMetricsSummary.clear();
this.accumulatedMetrics.clear();
- this.accumulatedMetrics.putAll(currentAccumulatedMetrics);
- this.currentAccumulatedMetrics.clear();
}
public MetricsData getMetrics() {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
index 15b01f7703..c45699722e 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
@@ -21,6 +21,7 @@
import com.google.common.collect.EvictingQueue;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
import lombok.Getter;
import lombok.Setter;
@@ -35,7 +36,16 @@
@Setter
public class FunctionStats {
- private static final String[] metricsLabelNames = {"tenant", "namespace",
"name", "instance_id"};
+ static final String[] metricsLabelNames = {"tenant", "namespace",
"function", "instance_id", "cluster"};
+
+ /** Declare metric names **/
+ static final String PULSAR_FUNCTION_PROCESSED_TOTAL =
"pulsar_function_processed_total";
+ static final String PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL =
"pulsar_function_processed_successfully_total";
+ static final String PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL =
"pulsar_function_system_exceptions_total";
+ static final String PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL =
"pulsar_function_user_exceptions_total";
+ static final String PULSAR_FUNCTION_PROCESS_LATENCY_MS =
"pulsar_function_process_latency_ms";
+ static final String PULSAR_FUNCTION_LAST_INVOCATION =
"pulsar_function_last_invocation";
+ static final String PULSAR_FUNCTION_RECEIVED_TOTAL =
"pulsar_function_received_total";
/** Declare Prometheus stats **/
@@ -49,6 +59,10 @@
final Summary statProcessLatency;
+ final Gauge statlastInvocation;
+
+ final Counter statTotalRecordsRecieved;
+
CollectorRegistry functionCollectorRegistry;
@Getter
@@ -56,47 +70,56 @@
@Getter
private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSystemExceptions = EvictingQueue.create(10);
- @Getter
- @Setter
- private long lastInvocationTime = 0;
-
- public FunctionStats() {
+ public FunctionStats(CollectorRegistry collectorRegistry) {
// Declare function local collector registry so that it will not clash
with other function instances'
// metrics collection especially in threaded mode
functionCollectorRegistry = new CollectorRegistry();
statTotalProcessed = Counter.build()
- .name("__function_total_processed__")
+ .name(PULSAR_FUNCTION_PROCESSED_TOTAL)
.help("Total number of messages processed.")
.labelNames(metricsLabelNames)
- .register(functionCollectorRegistry);
+ .register(collectorRegistry);
statTotalProcessedSuccessfully = Counter.build()
- .name("__function_total_successfully_processed__")
+ .name(PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL)
.help("Total number of messages processed successfully.")
.labelNames(metricsLabelNames)
- .register(functionCollectorRegistry);
+ .register(collectorRegistry);
statTotalSysExceptions = Counter.build()
- .name("__function_total_system_exceptions__")
+ .name(PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL)
.help("Total number of system exceptions.")
.labelNames(metricsLabelNames)
- .register(functionCollectorRegistry);
+ .register(collectorRegistry);
statTotalUserExceptions = Counter.build()
- .name("__function_total_user_exceptions__")
+ .name(PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL)
.help("Total number of user exceptions.")
.labelNames(metricsLabelNames)
- .register(functionCollectorRegistry);
+ .register(collectorRegistry);
statProcessLatency = Summary.build()
- .name("__function_process_latency_ms__").help("Process latency
in milliseconds.")
+ .name(PULSAR_FUNCTION_PROCESS_LATENCY_MS)
+ .help("Process latency in milliseconds.")
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.labelNames(metricsLabelNames)
- .register(functionCollectorRegistry);
+ .register(collectorRegistry);
+
+ statlastInvocation = Gauge.build()
+ .name(PULSAR_FUNCTION_LAST_INVOCATION)
+ .help("The timestamp of the last invocation of the function")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+
+ statTotalRecordsRecieved = Counter.build()
+ .name(PULSAR_FUNCTION_RECEIVED_TOTAL)
+ .help("Total number of messages received from source.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
}
public void addUserException(Exception ex) {
@@ -120,8 +143,9 @@ public void reset() {
statTotalSysExceptions.clear();
statTotalUserExceptions.clear();
statProcessLatency.clear();
+ statlastInvocation.clear();
+ statTotalRecordsRecieved.clear();
latestUserExceptions.clear();
latestSystemExceptions.clear();
- lastInvocationTime = 0;
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 040af91241..51ec0d488c 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -41,6 +41,7 @@
private FunctionDetails functionDetails;
private int maxBufferedTuples;
private int port;
+ private String clusterName;
/**
* Get the string representation of {@link #getInstanceId()}.
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 9e5ae51807..ab900ac40c 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,6 +22,7 @@
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
+import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Summary;
import lombok.AccessLevel;
import lombok.Getter;
@@ -45,6 +46,8 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.functions.ConsumerConfig;
+import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
@@ -58,8 +61,6 @@
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.common.functions.ConsumerConfig;
-import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.StateUtils;
@@ -116,6 +117,8 @@
private Sink sink;
private final SecretsProvider secretsProvider;
+
+ private CollectorRegistry collectorRegistry;
private final String[] metricsLabels;
public JavaInstanceRunnable(InstanceConfig instanceConfig,
@@ -123,19 +126,25 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
String jarFile,
PulsarClient pulsarClient,
String stateStorageServiceUrl,
- SecretsProvider secretsProvider) {
+ SecretsProvider secretsProvider,
+ CollectorRegistry collectorRegistry) {
this.instanceConfig = instanceConfig;
this.fnCache = fnCache;
this.jarFile = jarFile;
this.client = (PulsarClientImpl) pulsarClient;
this.stateStorageServiceUrl = stateStorageServiceUrl;
- this.stats = new FunctionStats();
+ this.stats = new FunctionStats(collectorRegistry);
this.secretsProvider = secretsProvider;
+ this.collectorRegistry = collectorRegistry;
this.metricsLabels = new String[]{
instanceConfig.getFunctionDetails().getTenant(),
- instanceConfig.getFunctionDetails().getNamespace(),
- instanceConfig.getFunctionDetails().getName(),
- String.valueOf(instanceConfig.getInstanceId())
+ String.format("%s/%s",
instanceConfig.getFunctionDetails().getTenant(),
+ instanceConfig.getFunctionDetails().getNamespace()),
+ String.format("%s/%s/%s",
instanceConfig.getFunctionDetails().getTenant(),
+ instanceConfig.getFunctionDetails().getNamespace(),
+ instanceConfig.getFunctionDetails().getName()),
+ String.valueOf(instanceConfig.getInstanceId()),
+ instanceConfig.getClusterName()
};
}
@@ -181,7 +190,7 @@ ContextImpl setupContext() {
}
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
- return new ContextImpl(instanceConfig, instanceLog, client,
inputTopics, secretsProvider);
+ return new ContextImpl(instanceConfig, instanceLog, client,
inputTopics, secretsProvider, collectorRegistry, metricsLabels);
}
/**
@@ -201,6 +210,9 @@ public void run() {
while (true) {
currentRecord = readInput();
+ // increment number of records received from source
+ stats.statTotalRecordsRecieved.labels(metricsLabels).inc();
+
if
(instanceConfig.getFunctionDetails().getProcessingGuarantees() ==
org.apache.pulsar.functions
.proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
if (instanceConfig.getFunctionDetails().getAutoAck()) {
@@ -212,7 +224,7 @@ public void run() {
JavaExecutionResult result;
// set last invocation time
- stats.setLastInvocationTime(System.currentTimeMillis());
+
stats.statlastInvocation.labels(metricsLabels).set(System.currentTimeMillis());
// start time for process latency stat
Summary.Timer requestTimer =
stats.statProcessLatency.labels(metricsLabels).startTimer();
@@ -331,7 +343,6 @@ private void processResult(Record srcRecord,
stats.addUserException(result.getUserException() );
srcRecord.fail();
} else {
- stats.statTotalProcessedSuccessfully.labels(metricsLabels).inc();
if (result.getResult() != null) {
sendOutputMessage(srcRecord, result.getResult());
} else {
@@ -340,6 +351,8 @@ private void processResult(Record srcRecord,
srcRecord.ack();
}
}
+ // increment total successfully processed
+ stats.statTotalProcessedSuccessfully.labels(metricsLabels).inc();
}
}
@@ -439,22 +452,24 @@ public void resetMetrics() {
private Builder createMetricsDataBuilder() {
InstanceCommunication.MetricsData.Builder bldr =
InstanceCommunication.MetricsData.newBuilder();
- addSystemMetrics("__total_processed__",
stats.statTotalProcessed.labels(metricsLabels).get(), bldr);
- addSystemMetrics("__total_successfully_processed__",
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr);
- addSystemMetrics("__total_system_exceptions__",
stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr);
- addSystemMetrics("__total_user_exceptions__",
stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr);
- addSystemMetrics("__avg_latency_ms__",
+ addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_TOTAL,
stats.statTotalProcessed.labels(metricsLabels).get(), bldr);
+
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL,
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr);
+
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL,
stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr);
+ addSystemMetrics(FunctionStats.PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL,
stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr);
+ addSystemMetrics(FunctionStats.PULSAR_FUNCTION_RECEIVED_TOTAL,
stats.statTotalRecordsRecieved.labels(metricsLabels).get(), bldr);
+ addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESS_LATENCY_MS,
stats.statProcessLatency.labels(metricsLabels).get().count <=
0.0
? 0 :
stats.statProcessLatency.labels(metricsLabels).get().sum /
stats.statProcessLatency.labels(metricsLabels).get().count,
bldr);
+ addSystemMetrics(FunctionStats.PULSAR_FUNCTION_LAST_INVOCATION,
stats.statlastInvocation.labels(metricsLabels).get(), bldr);
return bldr;
}
public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder =
InstanceCommunication.FunctionStatus.newBuilder();
-
functionStatusBuilder.setNumProcessed((long)stats.statTotalProcessed.labels(metricsLabels).get());
-
functionStatusBuilder.setNumSuccessfullyProcessed((long)stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
-
functionStatusBuilder.setNumUserExceptions((long)stats.statTotalUserExceptions.labels(metricsLabels).get());
+ functionStatusBuilder.setNumProcessed((long)
stats.statTotalProcessed.labels(metricsLabels).get());
+ functionStatusBuilder.setNumSuccessfullyProcessed((long)
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
+ functionStatusBuilder.setNumUserExceptions((long)
stats.statTotalUserExceptions.labels(metricsLabels).get());
stats.getLatestUserExceptions().forEach(ex -> {
functionStatusBuilder.addLatestUserExceptions(ex);
});
@@ -464,8 +479,9 @@ private Builder createMetricsDataBuilder() {
});
functionStatusBuilder.setAverageLatency(
stats.statProcessLatency.labels(metricsLabels).get().count ==
0.0
- ? 0 :
stats.statProcessLatency.labels(metricsLabels).get().sum /
stats.statProcessLatency.labels(metricsLabels).get().count);
-
functionStatusBuilder.setLastInvocationTime(stats.getLastInvocationTime());
+ ? 0 :
stats.statProcessLatency.labels(metricsLabels).get().sum /
stats.statProcessLatency
+ .labels(metricsLabels).get().count);
+ functionStatusBuilder.setLastInvocationTime((long)
stats.statlastInvocation.labels(metricsLabels).get());
return functionStatusBuilder;
}
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py
b/pulsar-functions/instance/src/main/python/contextimpl.py
index 83a63aa98a..9a37d59ab1 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -31,6 +31,9 @@
import util
import InstanceCommunication_pb2
+from prometheus_client import Summary
+from function_stats import Stats
+
# For keeping track of accumulated metrics
class AccumulatedMetricDatum(object):
def __init__(self):
@@ -39,7 +42,7 @@ def __init__(self):
self.max = float('-inf')
self.min = float('inf')
- def record(self, value):
+ def update(self, value):
self.count += 1
self.sum += value
if value > self.max:
@@ -48,14 +51,17 @@ def record(self, value):
self.min = value
class ContextImpl(pulsar.Context):
- def __init__(self, instance_config, logger, pulsar_client, user_code,
consumers, secrets_provider):
+
+ # add label to indicate user metric
+ user_metrics_label_names = Stats.metrics_label_names + ["metric"]
+
+ def __init__(self, instance_config, logger, pulsar_client, user_code,
consumers, secrets_provider, metrics_labels):
self.instance_config = instance_config
self.log = logger
self.pulsar_client = pulsar_client
self.user_code_dir = os.path.dirname(user_code)
self.consumers = consumers
self.secrets_provider = secrets_provider
- self.current_accumulated_metrics = {}
self.accumulated_metrics = {}
self.publish_producers = {}
self.publish_serializers = {}
@@ -69,6 +75,12 @@ def __init__(self, instance_config, logger, pulsar_client,
user_code, consumers,
if instance_config.function_details.secretsMap \
else {}
+ self.metrics_labels = metrics_labels
+ self.user_metrics_labels = dict()
+ self.user_metrics_summary = Summary("pulsar_function_user_metric",
+ 'Pulsar Function user defined metric',
+ ContextImpl.user_metrics_label_names)
+
# Called on a per message basis to set the context for the current message
def set_current_message_context(self, msgid, topic):
self.current_message_id = msgid
@@ -117,9 +129,12 @@ def get_secret(self, secret_key):
return self.secrets_provider.provide_secret(secret_key,
self.secrets_map[secret_key])
def record_metric(self, metric_name, metric_value):
- if not metric_name in self.current_accumulated_metrics:
- self.current_accumulated_metrics[metric_name] = AccumulatedMetricDatum()
- self.current_accumulated_metrics[metric_name].update(metric_value)
+ if metric_name not in self.user_metrics_labels:
+ self.user_metrics_labels[metric_name] = self.metrics_labels +
[metric_name]
+
self.user_metrics_summary.labels(*self.user_metrics_labels[metric_name]).observe(metric_value)
+ if not metric_name in self.accumulated_metrics:
+ self.accumulated_metrics[metric_name] = AccumulatedMetricDatum()
+ self.accumulated_metrics[metric_name].update(metric_value)
def get_output_topic(self):
return self.instance_config.function_details.output
@@ -164,17 +179,16 @@ def get_and_reset_metrics(self):
def reset_metrics(self):
# TODO: Make it thread safe
+ for labels in self.user_metrics_labels.values():
+ self.user_metrics_summary.labels(*labels)._sum.set(0.0)
+ self.user_metrics_summary.labels(*labels)._count.set(0.0)
self.accumulated_metrics.clear()
- self.accumulated_metrics.update(self.current_accumulated_metrics)
- self.current_accumulated_metrics.clear()
def get_metrics(self):
metrics = InstanceCommunication_pb2.MetricsData()
for metric_name, accumulated_metric in self.accumulated_metrics.items():
- m = InstanceCommunication_pb2.MetricsData.DataDigest()
- m.count = accumulated_metric.count
- m.sum = accumulated_metric.sum
- m.max = accumulated_metric.max
- m.min = accumulated_metric.min
- metrics.metrics[metric_name] = m
+ metrics.metrics[metric_name].count = accumulated_metric.count
+ metrics.metrics[metric_name].sum = accumulated_metric.sum
+ metrics.metrics[metric_name].max = accumulated_metric.max
+ metrics.metrics[metric_name].min = accumulated_metric.min
return metrics
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py
b/pulsar-functions/instance/src/main/python/function_stats.py
new file mode 100644
index 0000000000..13b3f8442f
--- /dev/null
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import traceback
+import time
+
+from prometheus_client import Counter, Summary, Gauge
+
+# We keep track of the following metrics
+class Stats(object):
+ metrics_label_names = ['tenant', 'namespace', 'function', 'instance_id',
'cluster']
+
+ TOTAL_PROCESSED = 'pulsar_function_processed_total'
+ TOTAL_SUCCESSFULLY_PROCESSED = 'pulsar_function_processed_successfully_total'
+ TOTAL_SYSTEM_EXCEPTIONS = 'pulsar_function_system_exceptions_total'
+ TOTAL_USER_EXCEPTIONS = 'pulsar_function_user_exceptions_total'
+ PROCESS_LATENCY_MS = 'pulsar_function_process_latency_ms'
+ LAST_INVOCATION = 'pulsar_function_last_invocation'
+ TOTAL_RECEIVED = 'pulsar_function_received_total'
+
+ # Declare Prometheus
+ stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages
processed.', metrics_label_names)
+ stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED,
+ 'Total number of messages
processed successfully.', metrics_label_names)
+ stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number
of system exceptions.',
+ metrics_label_names)
+ stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of
user exceptions.',
+ metrics_label_names)
+
+ stat_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in
milliseconds.', metrics_label_names)
+
+ stat_last_invocation = Gauge(LAST_INVOCATION, 'The timestamp of the last
invocation of the function.', metrics_label_names)
+
+ stat_total_received = Counter(TOTAL_RECEIVED, 'Total number of messages
received from source.', metrics_label_names)
+
+ latest_user_exception = []
+ latest_sys_exception = []
+
+ def add_user_exception(self):
+ self.latest_sys_exception.append((traceback.format_exc(), int(time.time()
* 1000)))
+ if len(self.latest_sys_exception) > 10:
+ self.latest_sys_exception.pop(0)
+
+ def add_sys_exception(self):
+ self.latest_sys_exception.append((traceback.format_exc(), int(time.time()
* 1000)))
+ if len(self.latest_sys_exception) > 10:
+ self.latest_sys_exception.pop(0)
+
+ def reset(self, metrics_labels):
+ self.latest_user_exception = []
+ self.latest_sys_exception = []
+ self.stat_total_processed.labels(*metrics_labels)._value.set(0.0)
+
self.stat_total_processed_successfully.labels(*metrics_labels)._value.set(0.0)
+ self.stat_total_user_exceptions.labels(*metrics_labels)._value.set(0.0)
+ self.stat_total_sys_exceptions.labels(*metrics_labels)._value.set(0.0)
+ self.stat_process_latency_ms.labels(*metrics_labels)._sum.set(0.0)
+ self.stat_process_latency_ms.labels(*metrics_labels)._count.set(0.0)
+ self.stat_last_invocation.labels(*metrics_labels).set(0.0)
+ self.stat_total_received.labels(*metrics_labels)._value.set(0.0)
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py
b/pulsar-functions/instance/src/main/python/python_instance.py
index 6f74461e85..1036b23132 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -31,14 +31,8 @@
except:
import queue
import threading
-from functools import partial
-from collections import namedtuple
-from threading import Timer
-from prometheus_client import Counter, Summary
-import traceback
import sys
import re
-
import pulsar
import contextimpl
import Function_pb2
@@ -46,6 +40,11 @@
import util
import InstanceCommunication_pb2
+from functools import partial
+from collections import namedtuple
+from threading import Timer
+from function_stats import Stats
+
Log = log.Log
# Equivalent of the InstanceConfig in Java
InstanceConfig = namedtuple('InstanceConfig', 'instance_id function_id
function_version function_details max_buffered_tuples')
@@ -68,55 +67,9 @@ def base64ify(bytes_or_str):
else:
return output_bytes
-# We keep track of the following metrics
-class Stats(object):
- metrics_label_names = ['tenant', 'namespace', 'name', 'instance_id']
-
- TOTAL_PROCESSED = '__function_total_processed__'
- TOTAL_SUCCESSFULLY_PROCESSED = '__function_total_successfully_processed__'
- TOTAL_SYSTEM_EXCEPTIONS = '__function_total_system_exceptions__'
- TOTAL_USER_EXCEPTIONS = '__function_total_user_exceptions__'
- PROCESS_LATENCY_MS = '__function_process_latency_ms__'
-
- # Declare Prometheus
- stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages
processed.', metrics_label_names)
- stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED,
- 'Total number of messages
processed successfully.', metrics_label_names)
- stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number
of system exceptions.',
- metrics_label_names)
- stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of
user exceptions.',
- metrics_label_names)
-
- stats_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in
milliseconds.', metrics_label_names)
-
- latest_user_exception = []
- latest_sys_exception = []
-
- last_invocation_time = 0.0
-
- def add_user_exception(self):
- self.latest_sys_exception.append((traceback.format_exc(), int(time.time()
* 1000)))
- if len(self.latest_sys_exception) > 10:
- self.latest_sys_exception.pop(0)
-
- def add_sys_exception(self):
- self.latest_sys_exception.append((traceback.format_exc(), int(time.time()
* 1000)))
- if len(self.latest_sys_exception) > 10:
- self.latest_sys_exception.pop(0)
-
- def reset(self, metrics_labels):
- self.latest_user_exception = []
- self.latest_sys_exception = []
- self.stat_total_processed.labels(*metrics_labels)._value.set(0.0)
-
self.stat_total_processed_successfully.labels(*metrics_labels)._value.set(0.0)
- self.stat_total_user_exceptions.labels(*metrics_labels)._value.set(0.0)
- self.stat_total_sys_exceptions.labels(*metrics_labels)._value.set(0.0)
- self.stats_process_latency_ms.labels(*metrics_labels)._sum.set(0)
- self.stats_process_latency_ms.labels(*metrics_labels)._count.set(0);
- self.last_invocation_time = 0.0
-
class PythonInstance(object):
- def __init__(self, instance_id, function_id, function_version,
function_details, max_buffered_tuples, expected_healthcheck_interval,
user_code, pulsar_client, secrets_provider):
+ def __init__(self, instance_id, function_id, function_version,
function_details, max_buffered_tuples,
+ expected_healthcheck_interval, user_code, pulsar_client,
secrets_provider, cluster_name):
self.instance_config = InstanceConfig(instance_id, function_id,
function_version, function_details, max_buffered_tuples)
self.user_code = user_code
self.queue = queue.Queue(max_buffered_tuples)
@@ -140,7 +93,10 @@ def __init__(self, instance_id, function_id,
function_version, function_details,
self.timeout_ms = function_details.source.timeoutMs if
function_details.source.timeoutMs > 0 else None
self.expected_healthcheck_interval = expected_healthcheck_interval
self.secrets_provider = secrets_provider
- self.metrics_labels = [function_details.tenant,
function_details.namespace, function_details.name, instance_id]
+ self.metrics_labels = [function_details.tenant,
+ "%s/%s" % (function_details.tenant,
function_details.namespace),
+ "%s/%s/%s" % (function_details.tenant,
function_details.namespace, function_details.name),
+ instance_id, cluster_name]
def health_check(self):
self.last_health_check_ts = time.time()
@@ -210,7 +166,9 @@ def run(self):
except:
self.function_purefunction = function_kclass
- self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log,
self.pulsar_client, self.user_code, self.consumers, self.secrets_provider)
+ self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log,
self.pulsar_client,
+ self.user_code, self.consumers,
+ self.secrets_provider,
self.metrics_labels)
# Now launch a thread that does execution
self.execution_thread = threading.Thread(target=self.actual_execution)
self.execution_thread.start()
@@ -242,13 +200,13 @@ def actual_execution(self):
try:
# get user function start time for statistic calculation
start_time = time.time()
- self.stats.last_invocation_time = start_time * 1000.0
+
Stats.stat_last_invocation.labels(*self.metrics_labels).set(start_time * 1000.0)
if self.function_class is not None:
output_object = self.function_class.process(input_object,
self.contextimpl)
else:
output_object = self.function_purefunction.process(input_object)
successfully_executed = True
-
Stats.stats_process_latency_ms.labels(*self.metrics_labels).observe((time.time()
- start_time) * 1000.0)
+
Stats.stat_process_latency_ms.labels(*self.metrics_labels).observe((time.time()
- start_time) * 1000.0)
Stats.stat_total_processed.labels(*self.metrics_labels).inc()
except Exception as e:
Log.exception("Exception while executing user method")
@@ -309,6 +267,8 @@ def setup_producer(self):
max_pending_messages=100000)
def message_listener(self, serde, consumer, message):
+ # increment number of received records from source
+ Stats.stat_total_received.labels(*self.metrics_labels).inc()
item = InternalMessage(message, consumer.topic(), serde, consumer)
self.queue.put(item, True)
if self.atmost_once and self.auto_ack:
@@ -328,14 +288,16 @@ def get_metrics(self):
# First get any user metrics
metrics = self.contextimpl.get_metrics()
# Now add system metrics as well
- self.add_system_metrics("__total_processed__",
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics)
- self.add_system_metrics("__total_successfully_processed__",
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(),
metrics)
- self.add_system_metrics("__total_system_exceptions__",
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get(),
metrics)
- self.add_system_metrics("__total_user_exceptions__",
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get(),
metrics)
- self.add_system_metrics("__avg_latency_ms__",
- 0.0 if
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0
- else
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() /
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get(),
+ self.add_system_metrics(Stats.TOTAL_PROCESSED,
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics)
+ self.add_system_metrics(Stats.TOTAL_SUCCESSFULLY_PROCESSED,
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(),
metrics)
+ self.add_system_metrics(Stats.TOTAL_SYSTEM_EXCEPTIONS,
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get(),
metrics)
+ self.add_system_metrics(Stats.TOTAL_USER_EXCEPTIONS,
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get(),
metrics)
+ self.add_system_metrics(Stats.PROCESS_LATENCY_MS,
+ 0.0 if
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0
+ else
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get() /
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get(),
metrics)
+ self.add_system_metrics(Stats.TOTAL_RECEIVED,
Stats.stat_total_received.labels(*self.metrics_labels)._value.get(), metrics)
+ self.add_system_metrics(Stats.LAST_INVOCATION,
Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get(), metrics)
return metrics
def add_system_metrics(self, metric_name, value, metrics):
@@ -347,23 +309,32 @@ def add_system_metrics(self, metric_name, value, metrics):
def get_function_status(self):
status = InstanceCommunication_pb2.FunctionStatus()
status.running = True
- status.numProcessed =
long(Stats.stat_total_processed.labels(*self.metrics_labels)._value.get())
- status.numSuccessfullyProcessed =
long(Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get())
- status.numUserExceptions =
long(Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get())
+
+ total_processed =
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get()
+ stat_total_processed_successfully =
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get()
+ stat_total_user_exceptions =
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get()
+ stat_total_sys_exceptions =
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get()
+ stat_process_latency_ms_count =
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
+ stat_process_latency_ms_sum =
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
+ stat_last_invocation =
Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get()
+
+ status.numProcessed = int(total_processed) if sys.version_info.major >= 3
else long(total_processed)
+ status.numSuccessfullyProcessed = int(stat_total_processed_successfully)
if sys.version_info.major >= 3 else long(stat_total_processed_successfully)
+ status.numUserExceptions = int(stat_total_user_exceptions) if
sys.version_info.major >= 3 else long(stat_total_user_exceptions)
status.instanceId = self.instance_config.instance_id
for ex, tm in self.stats.latest_user_exception:
to_add = status.latestUserExceptions.add()
to_add.exceptionString = ex
to_add.msSinceEpoch = tm
- status.numSystemExceptions =
long(Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get())
+ status.numSystemExceptions = int(stat_total_sys_exceptions) if
sys.version_info.major >= 3 else long(stat_total_sys_exceptions)
for ex, tm in self.stats.latest_sys_exception:
to_add = status.latestSystemExceptions.add()
to_add.exceptionString = ex
to_add.msSinceEpoch = tm
status.averageLatency = 0.0 \
- if
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0
\
- else
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() /
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get()
- status.lastInvocationTime = long(self.stats.last_invocation_time)
+ if stat_process_latency_ms_count <= 0.0 \
+ else stat_process_latency_ms_sum / stat_process_latency_ms_count
+ status.lastInvocationTime = int(stat_last_invocation) if
sys.version_info.major >= 3 else long(stat_last_invocation)
return status
def join(self):
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index d2562697f3..2a023806a5 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -39,6 +39,8 @@
import server
import python_instance
import util
+import prometheus_client
+
from google.protobuf import json_format
to_run = True
@@ -69,6 +71,7 @@ def main():
parser.add_argument('--hostname_verification_enabled', required=False,
help='Enable hostname verification')
parser.add_argument('--tls_trust_cert_path', required=False, help='Tls trust
cert file path')
parser.add_argument('--port', required=True, help='Instance Port', type=int)
+ parser.add_argument('--metrics_port', required=True, help="Port metrics will
be exposed on", type=int)
parser.add_argument('--max_buffered_tuples', required=True, help='Maximum
number of Buffered tuples')
parser.add_argument('--logging_directory', required=True, help='Logging
Directory')
parser.add_argument('--logging_file', required=True, help='Log file name')
@@ -79,6 +82,7 @@ def main():
parser.add_argument('--install_usercode_dependencies', required=False,
help='For packaged python like wheel files, do we need to install all
dependencies', type=bool)
parser.add_argument('--dependency_repository', required=False, help='For
packaged python like wheel files, which repository to pull the dependencies
from')
parser.add_argument('--extra_dependency_repository', required=False,
help='For packaged python like wheel files, any extra repository to pull the
dependencies from')
+ parser.add_argument('--cluster_name', required=True, help='The name of the
cluster this instance is running on')
args = parser.parse_args()
function_details = Function_pb2.FunctionDetails()
@@ -166,10 +170,12 @@ def main():
str(args.function_version),
function_details,
int(args.max_buffered_tuples),
int(args.expected_healthcheck_interval),
- str(args.py), pulsar_client,
secrets_provider)
+ str(args.py), pulsar_client,
secrets_provider, args.cluster_name)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)
+ prometheus_client.start_http_server(args.metrics_port)
+
global to_run
while to_run:
time.sleep(1)
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index e3e32fda3e..0ac3502fd6 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -30,6 +30,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import io.prometheus.client.CollectorRegistry;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
@@ -75,7 +76,7 @@ public void setup() {
logger,
client,
new ArrayList<>(),
- new EnvironmentBasedSecretsProvider()
+ new EnvironmentBasedSecretsProvider(), new CollectorRegistry(),
new String[0]
);
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 80b3b1da5a..b927c49d87 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -56,7 +56,7 @@ private static InstanceConfig createInstanceConfig(boolean
addCustom, String out
private JavaInstanceRunnable createRunnable(boolean addCustom, String
outputSerde) throws Exception {
InstanceConfig config = createInstanceConfig(addCustom, outputSerde);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- config, null, null, null, null, null);
+ config, null, null, null, null, null, null);
return javaInstanceRunnable;
}
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py
b/pulsar-functions/instance/src/test/python/test_python_instance.py
index 748e5d80a8..b865a9df82 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_python_instance.py
@@ -48,7 +48,7 @@ def test_context_publish(self):
pulsar_client.create_producer = Mock(return_value=producer)
user_code=__file__
consumers = None
- context_impl = ContextImpl(instance_config, logger, pulsar_client,
user_code, consumers, None)
+ context_impl = ContextImpl(instance_config, logger, pulsar_client,
user_code, consumers, None, None)
context_impl.publish("test_topic_name", "test_message")
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 98986dd566..d5fb80ee73 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -29,8 +29,17 @@
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.HTTPServer;
+import io.prometheus.client.hotspot.BufferPoolsExports;
+import io.prometheus.client.hotspot.ClassLoadingExports;
import io.prometheus.client.hotspot.DefaultExports;
+import io.prometheus.client.hotspot.GarbageCollectorExports;
+import io.prometheus.client.hotspot.MemoryPoolsExports;
+import io.prometheus.client.hotspot.StandardExports;
+import io.prometheus.client.hotspot.ThreadExports;
+import io.prometheus.client.hotspot.VersionInfoExports;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -43,12 +52,14 @@
import org.apache.pulsar.functions.utils.Reflections;
import java.lang.reflect.Type;
+import java.net.InetSocketAddress;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
/**
* A function container implemented using java thread.
@@ -99,6 +110,9 @@
@Parameter(names = "--port", description = "Port to listen on\n", required
= true)
protected int port;
+ @Parameter(names = "--metrics_port", description = "Port metrics will be
exposed on\n", required = true)
+ protected int metrics_port;
+
@Parameter(names = "--max_buffered_tuples", description = "Maximum number
of tuples to buffer\n", required = true)
protected int maxBufferedTuples;
@@ -111,11 +125,15 @@
@Parameter(names = "--secrets_provider_config", description = "The config
that needs to be passed to secrets provider", required = false)
protected String secretsProviderConfig;
+ @Parameter(names = "--cluster_name", description = "The name of the
cluster this instance is running on", required = true)
+ protected String clusterName;
+
private Server server;
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
private Long lastHealthCheckTs = null;
private ScheduledExecutorService timer;
+ private HTTPServer metricsServer;
public JavaInstanceMain() { }
@@ -126,6 +144,7 @@ public void start() throws Exception {
instanceConfig.setFunctionVersion(functionVersion);
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
+ instanceConfig.setClusterName(clusterName);
FunctionDetails.Builder functionDetailsBuilder =
FunctionDetails.newBuilder();
if (functionDetailsJsonString.charAt(0) == '\'') {
functionDetailsJsonString = functionDetailsJsonString.substring(1);
@@ -162,6 +181,9 @@ public void start() throws Exception {
}
secretsProvider.init(secretsProviderConfigMap);
+ // Collector Registry for prometheus metrics
+ CollectorRegistry collectorRegistry = new CollectorRegistry();
+
containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
pulsarServiceUrl,
stateStorageServiceUrl,
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin)
@@ -169,7 +191,7 @@ public void start() throws Exception {
.tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
.tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
- secretsProvider);
+ secretsProvider, collectorRegistry);
runtimeSpawner = new RuntimeSpawner(
instanceConfig,
jarFile,
@@ -195,12 +217,13 @@ public void run() {
}
});
- // registering jvm metrics to prometheus
- DefaultExports.initialize();
-
log.info("Starting runtimeSpawner");
runtimeSpawner.start();
+ // starting metrics server
+ log.info("Starting metrics server on port {}", metrics_port);
+ metricsServer = new HTTPServer(new InetSocketAddress(metrics_port),
collectorRegistry, true);
+
if (expectedHealthCheckInterval > 0) {
timer = Executors.newSingleThreadScheduledExecutor();
timer.scheduleAtFixedRate(new TimerTask() {
@@ -253,6 +276,9 @@ public void close() {
if (containerFactory != null) {
containerFactory.close();
}
+ if (metricsServer != null) {
+ metricsServer.stop();
+ }
} catch (Exception ex) {
System.err.println(ex);
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 6e716f21dc..4dc97ace14 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -88,9 +88,7 @@
private static final String ENV_SHARD_ID = "SHARD_ID";
private static final int maxJobNameSize = 55;
private static final Integer GRPC_PORT = 9093;
- private static final Integer PROMETHEUS_PORT = 9094;
- private static final Double prometheusMetricsServerCpu = 0.1;
- private static final Long prometheusMetricsServerRam = 125000000l;
+ private static final Integer METRICS_PORT = 9094;
public static final Pattern VALID_POD_NAME_REGEX =
Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
Pattern.CASE_INSENSITIVE);
@@ -110,7 +108,6 @@
// The thread that invokes the function
@Getter
private List<String> processArgs;
- private List<String> prometheusMetricsServerArgs;
@Getter
private ManagedChannel[] channel;
private InstanceControlGrpc.InstanceControlFutureStub[] stub;
@@ -190,8 +187,8 @@
secretsProviderConfig,
installUserCodeDependencies,
pythonDependencyRepository,
- pythonExtraDependencyRepository);
- this.prometheusMetricsServerArgs =
composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile,
expectedMetricsInterval);
+ pythonExtraDependencyRepository,
+ METRICS_PORT);
running = false;
doChecks(instanceConfig.getFunctionDetails());
}
@@ -453,14 +450,6 @@ public void deleteService() throws Exception {
);
}
- protected List<String> getPrometheusMetricsServerCommand() {
- return Arrays.asList(
- "sh",
- "-c",
- String.join(" ", prometheusMetricsServerArgs)
- );
- }
-
private List<String> getDownloadCommand(String bkPath, String
userCodeFilePath) {
return Arrays.asList(
pulsarRootDir + "/bin/pulsar-admin",
@@ -525,15 +514,16 @@ private V1StatefulSet createStatefulSet() {
private Map<String, String> getPrometheusAnnotations() {
final Map<String, String> annotations = new HashMap<>();
annotations.put("prometheus.io/scrape", "true");
- annotations.put("prometheus.io/port", String.valueOf(PROMETHEUS_PORT));
+ annotations.put("prometheus.io/port", String.valueOf(METRICS_PORT));
return annotations;
}
private Map<String, String> getLabels(Function.FunctionDetails
functionDetails) {
final Map<String, String> labels = new HashMap<>();
- labels.put("app", createJobName(functionDetails));
- labels.put("namespace", functionDetails.getNamespace());
+ labels.put("namespace",
String.format("%s/%s",functionDetails.getTenant(),
functionDetails.getNamespace()));
labels.put("tenant", functionDetails.getTenant());
+ labels.put("function", String.format("%s/%s/%s",
functionDetails.getTenant(),
+ functionDetails.getNamespace(), functionDetails.getName()));
if (customLabels != null && !customLabels.isEmpty()) {
labels.putAll(customLabels);
}
@@ -552,7 +542,6 @@ private V1PodSpec getPodSpec(List<String> instanceCommand,
Function.Resources re
List<V1Container> containers = new LinkedList<>();
containers.add(getFunctionContainer(instanceCommand, resource));
- containers.add(getPrometheusContainer());
podSpec.containers(containers);
// Configure secrets
@@ -607,38 +596,6 @@ private V1Container getFunctionContainer(List<String>
instanceCommand, Function.
return container;
}
- private V1Container getPrometheusContainer() {
- final V1Container container = new
V1Container().name("prometheusmetricsserver");
-
- // set up the container images
- container.setImage(pulsarDockerImageName);
-
- // set up the container command
- container.setCommand(getPrometheusMetricsServerCommand());
-
- // setup the environment variables for the container
- final V1EnvVar envVarPodName = new V1EnvVar();
- envVarPodName.name("POD_NAME")
- .valueFrom(new V1EnvVarSource()
- .fieldRef(new V1ObjectFieldSelector()
- .fieldPath("metadata.name")));
- container.setEnv(Arrays.asList(envVarPodName));
-
-
- // set container resources
- final V1ResourceRequirements resourceRequirements = new
V1ResourceRequirements();
- final Map<String, Quantity> requests = new HashMap<>();
- requests.put("memory",
Quantity.fromString(Long.toString(prometheusMetricsServerRam)));
- requests.put("cpu",
Quantity.fromString(Double.toString(prometheusMetricsServerCpu)));
- resourceRequirements.setRequests(requests);
- container.setResources(resourceRequirements);
-
- // set container ports
- container.setPorts(getPrometheusContainerPorts());
-
- return container;
- }
-
private List<V1ContainerPort> getFunctionContainerPorts() {
List<V1ContainerPort> ports = new ArrayList<>();
final V1ContainerPort port = new V1ContainerPort();
@@ -652,7 +609,7 @@ private V1Container getPrometheusContainer() {
List<V1ContainerPort> ports = new ArrayList<>();
final V1ContainerPort port = new V1ContainerPort();
port.setName("prometheus");
- port.setContainerPort(PROMETHEUS_PORT);
+ port.setContainerPort(METRICS_PORT);
ports.add(port);
return ports;
}
@@ -680,24 +637,4 @@ public static void doChecks(Function.FunctionDetails
functionDetails) {
throw new RuntimeException("Kubernetes job name size should be
less than " + maxJobNameSize);
}
}
-
- private List<String> composePrometheusMetricsServerArgs(String
prometheusMetricsServerFile,
- Integer
expectedMetricsInterval) throws Exception {
- List<String> args = new LinkedList<>();
- args.add("java");
- args.add("-cp");
- args.add(prometheusMetricsServerFile);
-
args.add("-Dlog4j.configurationFile=prometheus_metricsserver_log4j2.yml");
- args.add("-Xmx" + String.valueOf(prometheusMetricsServerRam));
- args.add(PrometheusMetricsServer.class.getName());
- args.add("--function_details");
- args.add("'" +
JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails())
+ "'");
- args.add("--prometheus_port");
- args.add(String.valueOf(PROMETHEUS_PORT));
- args.add("--grpc_port");
- args.add(String.valueOf(GRPC_PORT));
- args.add("--collection_interval");
- args.add(String.valueOf(expectedMetricsInterval));
- return args;
- }
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index e85a45d593..733be37dd5 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -189,6 +189,7 @@ protected static void
startLocalRun(org.apache.pulsar.functions.proto.Function.F
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(Utils.findAvailablePort());
+ instanceConfig.setClusterName("local");
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
userCodeFile,
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 0e4319f561..9ac20ab80d 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -36,6 +36,7 @@
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Utils;
import java.io.InputStream;
import java.util.List;
@@ -117,7 +118,8 @@
secretsProviderConfig,
false,
null,
- null);
+ null,
+ Utils.findAvailablePort());
}
/**
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index e84ec203fe..05f21e614c 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -57,7 +57,8 @@
String secretsProviderConfig,
Boolean
installUserCodeDepdendencies,
String pythonDependencyRepository,
- String
pythonExtraDependencyRepository) throws Exception {
+ String
pythonExtraDependencyRepository,
+ int metricsPort) throws Exception {
List<String> args = new LinkedList<>();
if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
args.add("java");
@@ -160,6 +161,9 @@
args.add("--port");
args.add(String.valueOf(grpcPort));
+ args.add("--metrics_port");
+ args.add(String.valueOf(metricsPort));
+
// state storage configs
if (null != stateStorageServiceUrl
&& instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
@@ -177,6 +181,9 @@
args.add("'" + secretsProviderConfig + "'");
}
}
+
+ args.add("--cluster_name");
+ args.add(instanceConfig.getClusterName());
return args;
}
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 5e42c52ea2..913d4ae2a8 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -21,6 +21,7 @@
import java.util.concurrent.CompletableFuture;
+import io.prometheus.client.CollectorRegistry;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClient;
@@ -53,18 +54,28 @@
String jarFile,
PulsarClient pulsarClient,
String stateStorageServiceUrl,
- SecretsProvider secretsProvider) {
+ SecretsProvider secretsProvider,
+ CollectorRegistry collectorRegistry) {
this.instanceConfig = instanceConfig;
if (instanceConfig.getFunctionDetails().getRuntime() !=
Function.FunctionDetails.Runtime.JAVA) {
throw new RuntimeException("Thread Container only supports Java
Runtime");
}
+
+ // if collector registry is not set, create one for this thread.
+ // since each thread / instance will needs its own collector registry
for metrics collection
+ CollectorRegistry instanceCollectorRegistry = collectorRegistry;
+ if (instanceCollectorRegistry == null) {
+ instanceCollectorRegistry = new CollectorRegistry();
+ }
+
this.javaInstanceRunnable = new JavaInstanceRunnable(
instanceConfig,
fnCache,
jarFile,
pulsarClient,
stateStorageServiceUrl,
- secretsProvider);
+ secretsProvider,
+ instanceCollectorRegistry);
this.threadGroup = threadGroup;
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index 846028d40a..ed76117735 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
+import io.prometheus.client.CollectorRegistry;
import lombok.extern.slf4j.Slf4j;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -45,21 +46,23 @@
private final PulsarClient pulsarClient;
private final String storageServiceUrl;
private final SecretsProvider secretsProvider;
+ private final CollectorRegistry collectorRegistry;
private volatile boolean closed;
public ThreadRuntimeFactory(String threadGroupName, String
pulsarServiceUrl, String storageServiceUrl,
- AuthenticationConfig authConfig,
SecretsProvider secretsProvider) throws Exception {
- this(threadGroupName, createPulsarClient(pulsarServiceUrl,
authConfig), storageServiceUrl, secretsProvider);
+ AuthenticationConfig authConfig,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry) throws
Exception {
+ this(threadGroupName, createPulsarClient(pulsarServiceUrl,
authConfig), storageServiceUrl, secretsProvider, collectorRegistry);
}
@VisibleForTesting
public ThreadRuntimeFactory(String threadGroupName, PulsarClient
pulsarClient, String storageServiceUrl,
- SecretsProvider secretsProvider) {
+ SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry) {
this.secretsProvider = secretsProvider;
this.fnCache = new FunctionCacheManagerImpl();
this.threadGroup = new ThreadGroup(threadGroupName);
this.pulsarClient = pulsarClient;
this.storageServiceUrl = storageServiceUrl;
+ this.collectorRegistry = collectorRegistry;
}
private static PulsarClient createPulsarClient(String pulsarServiceUrl,
AuthenticationConfig authConfig)
@@ -94,7 +97,8 @@ public ThreadRuntime createContainer(InstanceConfig
instanceConfig, String jarFi
jarFile,
pulsarClient,
storageServiceUrl,
- secretsProvider);
+ secretsProvider,
+ collectorRegistry);
}
@Override
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 2e00983ec5..c22970efb8 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -192,6 +192,7 @@ InstanceConfig
createJavaInstanceConfig(FunctionDetails.Runtime runtime, boolean
config.setFunctionVersion("1.0");
config.setInstanceId(0);
config.setMaxBufferedTuples(1024);
+ config.setClusterName("standalone");
return config;
}
@@ -233,16 +234,19 @@ private void verifyJavaInstance(InstanceConfig config,
String depsDir, boolean s
String classpath = javaInstanceJarFile;
String extraDepsEnv;
int portArg;
+ int metricsPortArg;
int totalArgs;
if (null != depsDir) {
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" +
depsDir;
classpath = classpath + ":" + depsDir + "/*";
- totalArgs = 29;
+ totalArgs = 33;
portArg = 24;
+ metricsPortArg = 26;
} else {
extraDepsEnv = "";
portArg = 23;
- totalArgs = 28;
+ metricsPortArg = 25;
+ totalArgs = 32;
}
if (secretsAttached) {
totalArgs += 4;
@@ -263,13 +267,15 @@ private void verifyJavaInstance(InstanceConfig config,
String depsDir, boolean s
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" +
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+ + " --max_buffered_tuples 1024 --port " + args.get(portArg) +
" --metrics_port " + args.get(metricsPortArg)
+ " --state_storage_serviceurl " + stateStorageServiceUrl
+ " --expected_healthcheck_interval -1";
if (secretsAttached) {
expectedArgs += " --secrets_provider
org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
+ " --secrets_provider_config
'{\"Somevalue\":\"myvalue\"}'";
}
+ expectedArgs += " --cluster_name standalone";
+
assertEquals(String.join(" ", args), expectedArgs);
}
@@ -301,15 +307,18 @@ private void verifyPythonInstance(InstanceConfig config,
String extraDepsDir, bo
int portArg;
String pythonPath;
int configArg;
+ int metricsPortArg;
if (null == extraDepsDir) {
- totalArgs = 32;
+ totalArgs = 36;
portArg = 29;
configArg = 9;
pythonPath = "";
+ metricsPortArg = 31;
} else {
- totalArgs = 33;
+ totalArgs = 37;
portArg = 30;
configArg = 10;
+ metricsPortArg = 32;
pythonPath = "PYTHONPATH=${PYTHONPATH}:" + extraDepsDir + " ";
}
if (secretsAttached) {
@@ -331,12 +340,13 @@ private void verifyPythonInstance(InstanceConfig config,
String extraDepsDir, bo
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" +
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+ + " --max_buffered_tuples 1024 --port " + args.get(portArg) +
" --metrics_port " + args.get(metricsPortArg)
+ " --expected_healthcheck_interval -1";
if (secretsAttached) {
expectedArgs += " --secrets_provider
secretsprovider.ClearTextSecretsProvider"
+ " --secrets_provider_config
'{\"Somevalue\":\"myvalue\"}'";
}
+ expectedArgs += " --cluster_name standalone";
assertEquals(String.join(" ", args), expectedArgs);
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 63875fd0f1..323943c13d 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -172,6 +172,7 @@ InstanceConfig
createJavaInstanceConfig(FunctionDetails.Runtime runtime) {
config.setFunctionVersion("1.0");
config.setInstanceId(0);
config.setMaxBufferedTuples(1024);
+ config.setClusterName("standalone");
return config;
}
@@ -250,15 +251,18 @@ private void verifyJavaInstance(InstanceConfig config,
Path depsDir) throws Exce
String classpath = javaInstanceJarFile;
String extraDepsEnv;
int portArg;
+ int metricsPortArg;
if (null != depsDir) {
- assertEquals(args.size(), 33);
+ assertEquals(args.size(), 37);
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" +
depsDir.toString();
classpath = classpath + ":" + depsDir + "/*";
portArg = 24;
+ metricsPortArg = 26;
} else {
- assertEquals(args.size(), 32);
+ assertEquals(args.size(), 36);
extraDepsEnv = "";
portArg = 23;
+ metricsPortArg = 25;
}
String expectedArgs = "java -cp " + classpath
@@ -273,11 +277,12 @@ private void verifyJavaInstance(InstanceConfig config,
Path depsDir) throws Exce
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" +
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+ + " --max_buffered_tuples 1024 --port " + args.get(portArg) +
" --metrics_port " + args.get(metricsPortArg)
+ " --state_storage_serviceurl " + stateStorageServiceUrl
+ " --expected_healthcheck_interval 30"
+ " --secrets_provider
org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
- + " --secrets_provider_config '{\"Config\":\"Value\"}'";
+ + " --secrets_provider_config '{\"Config\":\"Value\"}'"
+ + " --cluster_name standalone";
assertEquals(String.join(" ", args), expectedArgs);
}
@@ -305,8 +310,9 @@ private void verifyPythonInstance(InstanceConfig config,
String extraDepsDir) th
ProcessRuntime container = factory.createContainer(config,
userJarFile, null, 30l);
List<String> args = container.getProcessArgs();
- int totalArgs = 30;
+ int totalArgs = 34;
int portArg = 23;
+ int metricsPortArg = 25;
String pythonPath = "";
int configArg = 9;
@@ -319,10 +325,11 @@ private void verifyPythonInstance(InstanceConfig config,
String extraDepsDir) th
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" +
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+ + " --max_buffered_tuples 1024 --port " + args.get(portArg) +
" --metrics_port " + args.get(metricsPortArg)
+ " --expected_healthcheck_interval 30"
+ " --secrets_provider
secretsprovider.ClearTextSecretsProvider"
- + " --secrets_provider_config '{\"Config\":\"Value\"}'";
+ + " --secrets_provider_config '{\"Config\":\"Value\"}'"
+ + " --cluster_name standalone";
assertEquals(String.join(" ", args), expectedArgs);
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 4f8d04a71d..46343a5b86 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -172,6 +172,7 @@ public void startFunction(FunctionRuntimeInfo
functionRuntimeInfo) throws Except
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
+
instanceConfig.setClusterName(workerConfig.getPulsarFunctionsCluster());
log.info("{}/{}/{}-{} start process with instance config {}",
functionDetails.getTenant(), functionDetails.getNamespace(),
functionDetails.getName(), instanceId, instanceConfig);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 25b97ce9c4..8a8fafa1a5 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -36,6 +36,7 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
+import io.prometheus.client.CollectorRegistry;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
@@ -130,7 +131,8 @@ public FunctionRuntimeManager(WorkerConfig workerConfig,
WorkerService workerSer
workerConfig.getPulsarServiceUrl(),
workerConfig.getStateStorageServiceUrl(),
authConfig,
- new ClearTextSecretsProvider());
+ new ClearTextSecretsProvider(),
+ null);
} else if (workerConfig.getProcessContainerFactory() != null) {
this.runtimeFactory = new ProcessRuntimeFactory(
workerConfig.getPulsarServiceUrl(),
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index e497ff3352..d2f1504510 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -73,13 +73,13 @@ public static void generate(WorkerService workerService,
String cluster, SimpleT
int instanceId =
functionRuntimeInfo.getFunctionInstance().getInstanceId();
String qualifiedNamespace =
String.format("%s/%s", tenant, namespace);
- metric(out, cluster, qualifiedNamespace, name,
String.format("%scount", metricName),
+ metric(out, cluster, qualifiedNamespace, name,
String.format("%s_count", metricName),
instanceId, dataDigest.getCount());
- metric(out, cluster, qualifiedNamespace, name,
String.format("%smax", metricName),
+ metric(out, cluster, qualifiedNamespace, name,
String.format("%s_max", metricName),
instanceId, dataDigest.getMax());
- metric(out, cluster, qualifiedNamespace,name,
String.format("%smin", metricName),
+ metric(out, cluster, qualifiedNamespace,name,
String.format("%s_min", metricName),
instanceId, dataDigest.getMin());
- metric(out, cluster, qualifiedNamespace, name,
String.format("%ssum", metricName),
+ metric(out, cluster, qualifiedNamespace, name,
String.format("%s_sum", metricName),
instanceId, dataDigest.getSum());
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index e35aa2b90b..c957de7e4b 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -87,10 +87,10 @@ public void testFunctionsStatsGenerate() {
CompletableFuture<InstanceCommunication.MetricsData>
metricsDataCompletableFuture = new CompletableFuture<>();
InstanceCommunication.MetricsData metricsData =
InstanceCommunication.MetricsData.newBuilder()
.putMetrics(
- "__function_total_processed__",
+ "pulsar_function_processed_total",
InstanceCommunication.MetricsData.DataDigest.newBuilder()
.setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build())
- .putMetrics("__function_process_latency_ms__",
+ .putMetrics("pulsar_function_process_latency_ms",
InstanceCommunication.MetricsData.DataDigest.newBuilder()
.setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build())
.build();
@@ -127,56 +127,56 @@ public void testFunctionsStatsGenerate() {
Assert.assertEquals(metrics.size(), 8);
System.out.println("metrics: " + metrics);
- Metric m = metrics.get("__function_total_processed__count");
+ Metric m = metrics.get("pulsar_function_processed_total_count");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 100.0);
- m = metrics.get("__function_total_processed__max");
+ m = metrics.get("pulsar_function_processed_total_max");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 200.0);
- m = metrics.get("__function_total_processed__sum");
+ m = metrics.get("pulsar_function_processed_total_sum");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 300.0);
- m = metrics.get("__function_total_processed__min");
+ m = metrics.get("pulsar_function_processed_total_min");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 0.0);
- m = metrics.get("__function_process_latency_ms__count");
+ m = metrics.get("pulsar_function_process_latency_ms_count");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 10.0);
- m = metrics.get("__function_process_latency_ms__max");
+ m = metrics.get("pulsar_function_process_latency_ms_max");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 20.0);
- m = metrics.get("__function_process_latency_ms__sum");
+ m = metrics.get("pulsar_function_process_latency_ms_sum");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 30.0);
- m = metrics.get("__function_process_latency_ms__min");
+ m = metrics.get("pulsar_function_process_latency_ms_min");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
assertEquals(m.tags.get("name"), "func-1");
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 066ca816a5..ffd1bd6bff 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -21,6 +21,7 @@
import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.util.concurrent.DefaultThreadFactory;
+import io.prometheus.client.CollectorRegistry;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
@@ -128,7 +129,7 @@ public void setup() {
public void stop() {
this.executor.shutdown();
}
-
+
@Test
public void testSchedule() throws Exception {
@@ -141,7 +142,7 @@ public void testSchedule() throws Exception {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -187,7 +188,7 @@ public void testNothingNewToSchedule() throws Exception {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -234,7 +235,7 @@ public void testAddingFunctions() throws Exception {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -294,7 +295,7 @@ public void testDeletingFunctions() throws Exception {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -359,7 +360,8 @@ public void testScalingUp() throws Exception {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider
+ (), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -470,7 +472,7 @@ public void testScalingDown() throws Exception {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -597,7 +599,7 @@ public void testHeartbeatFunction() throws Exception {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
Map<String, Map<String, Function.Assignment>> currentAssignments = new
HashMap<>();
@@ -651,7 +653,7 @@ public void testUpdate() throws Exception {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -784,7 +786,7 @@ public void testAssignmentWorkerDoesNotExist() throws
InterruptedException, NoSu
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null,
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services