jerrypeng closed pull request #2930: Expose metrics via http port in function 
instance
URL: https://github.com/apache/pulsar/pull/2930
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 406fe13db6..d13964df34 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,25 +18,12 @@
  */
 package org.apache.pulsar.functions.instance;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Summary;
 import lombok.Getter;
 import lombok.Setter;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -55,9 +42,24 @@
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkState;
+
 /**
  * This class implements the Context interface exposed to the user.
  */
+
 class ContextImpl implements Context, SinkContext, SourceContext {
     private InstanceConfig config;
     private Logger logger;
@@ -92,7 +94,6 @@ public void update(double value) {
         }
     }
 
-    private ConcurrentMap<String, AccumulatedMetricDatum> 
currentAccumulatedMetrics;
     private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics;
 
     private Map<String, Producer<?>> publishProducers;
@@ -110,11 +111,21 @@ public void update(double value) {
     private StateContextImpl stateContext;
     private Map<String, Object> userConfigs;
 
+    Map<String, String[]> userMetricsLabels = new HashMap<>();
+    private final String[] metricsLabels;
+    private final Summary userMetricsSummary;
+
+    private final static String[] userMetricsLabelNames;
+    static {
+        // add label to indicate user metric
+        userMetricsLabelNames = Arrays.copyOf(FunctionStats.metricsLabelNames, 
FunctionStats.metricsLabelNames.length + 1);
+        userMetricsLabelNames[FunctionStats.metricsLabelNames.length] = 
"metric";
+    }
+
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient 
client, List<String> inputTopics,
-                       SecretsProvider secretsProvider) {
+                       SecretsProvider secretsProvider, CollectorRegistry 
collectorRegistry, String[] metricsLabels) {
         this.config = config;
         this.logger = logger;
-        this.currentAccumulatedMetrics = new ConcurrentHashMap<>();
         this.accumulatedMetrics = new ConcurrentHashMap<>();
         this.publishProducers = new HashMap<>();
         this.inputTopics = inputTopics;
@@ -138,6 +149,17 @@ public ContextImpl(InstanceConfig config, Logger logger, 
PulsarClient client, Li
         } else {
             secretsMap = new HashMap<>();
         }
+
+        this.metricsLabels = metricsLabels;
+        this.userMetricsSummary = Summary.build()
+                .name("pulsar_function_user_metric")
+                .help("Pulsar Function user defined metric.")
+                .labelNames(userMetricsLabelNames)
+                .quantile(0.5, 0.01)
+                .quantile(0.9, 0.01)
+                .quantile(0.99, 0.01)
+                .quantile(0.999, 0.01)
+                .register(collectorRegistry);
     }
 
     public void setCurrentMessageContext(Record<?> record) {
@@ -320,8 +342,16 @@ public ByteBuffer getState(String key) {
 
     @Override
     public void recordMetric(String metricName, double value) {
-        currentAccumulatedMetrics.putIfAbsent(metricName, new 
AccumulatedMetricDatum());
-        currentAccumulatedMetrics.get(metricName).update(value);
+        userMetricsLabels.computeIfAbsent(metricName,
+                s -> {
+                    String[] userMetricLabels = Arrays.copyOf(metricsLabels, 
metricsLabels.length + 1);
+                    userMetricLabels[userMetricLabels.length - 1] = metricName;
+                    return userMetricLabels;
+                });
+
+        
userMetricsSummary.labels(userMetricsLabels.get(metricName)).observe(value);
+        accumulatedMetrics.putIfAbsent(metricName, new 
AccumulatedMetricDatum());
+        accumulatedMetrics.get(metricName).update(value);
     }
 
     public MetricsData getAndResetMetrics() {
@@ -331,9 +361,8 @@ public MetricsData getAndResetMetrics() {
     }
 
     public void resetMetrics() {
+        userMetricsSummary.clear();
         this.accumulatedMetrics.clear();
-        this.accumulatedMetrics.putAll(currentAccumulatedMetrics);
-        this.currentAccumulatedMetrics.clear();
     }
 
     public MetricsData getMetrics() {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
index 15b01f7703..c45699722e 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
@@ -21,6 +21,7 @@
 import com.google.common.collect.EvictingQueue;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
 import io.prometheus.client.Summary;
 import lombok.Getter;
 import lombok.Setter;
@@ -35,7 +36,16 @@
 @Setter
 public class FunctionStats {
 
-    private static final String[] metricsLabelNames = {"tenant", "namespace", 
"name", "instance_id"};
+    static final String[] metricsLabelNames = {"tenant", "namespace", 
"function", "instance_id", "cluster"};
+
+    /** Declare metric names **/
+    static final String PULSAR_FUNCTION_PROCESSED_TOTAL = 
"pulsar_function_processed_total";
+    static final String PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL = 
"pulsar_function_processed_successfully_total";
+    static final String PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL = 
"pulsar_function_system_exceptions_total";
+    static final String PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL = 
"pulsar_function_user_exceptions_total";
+    static final String PULSAR_FUNCTION_PROCESS_LATENCY_MS = 
"pulsar_function_process_latency_ms";
+    static final String PULSAR_FUNCTION_LAST_INVOCATION = 
"pulsar_function_last_invocation";
+    static final String PULSAR_FUNCTION_RECEIVED_TOTAL = 
"pulsar_function_received_total";
 
     /** Declare Prometheus stats **/
 
@@ -49,6 +59,10 @@
 
     final Summary statProcessLatency;
 
+    final Gauge statlastInvocation;
+
+    final Counter statTotalRecordsRecieved;
+
     CollectorRegistry functionCollectorRegistry;
 
     @Getter
@@ -56,47 +70,56 @@
     @Getter
     private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestSystemExceptions = EvictingQueue.create(10);
 
-    @Getter
-    @Setter
-    private long lastInvocationTime = 0;
-
-    public FunctionStats() {
+    public FunctionStats(CollectorRegistry collectorRegistry) {
         // Declare function local collector registry so that it will not clash 
with other function instances'
         // metrics collection especially in threaded mode
         functionCollectorRegistry = new CollectorRegistry();
 
         statTotalProcessed = Counter.build()
-                .name("__function_total_processed__")
+                .name(PULSAR_FUNCTION_PROCESSED_TOTAL)
                 .help("Total number of messages processed.")
                 .labelNames(metricsLabelNames)
-                .register(functionCollectorRegistry);
+                .register(collectorRegistry);
 
         statTotalProcessedSuccessfully = Counter.build()
-                .name("__function_total_successfully_processed__")
+                .name(PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL)
                 .help("Total number of messages processed successfully.")
                 .labelNames(metricsLabelNames)
-                .register(functionCollectorRegistry);
+                .register(collectorRegistry);
 
         statTotalSysExceptions = Counter.build()
-                .name("__function_total_system_exceptions__")
+                .name(PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL)
                 .help("Total number of system exceptions.")
                 .labelNames(metricsLabelNames)
-                .register(functionCollectorRegistry);
+                .register(collectorRegistry);
 
         statTotalUserExceptions = Counter.build()
-                .name("__function_total_user_exceptions__")
+                .name(PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL)
                 .help("Total number of user exceptions.")
                 .labelNames(metricsLabelNames)
-                .register(functionCollectorRegistry);
+                .register(collectorRegistry);
 
         statProcessLatency = Summary.build()
-                .name("__function_process_latency_ms__").help("Process latency 
in milliseconds.")
+                .name(PULSAR_FUNCTION_PROCESS_LATENCY_MS)
+                .help("Process latency in milliseconds.")
                 .quantile(0.5, 0.01)
                 .quantile(0.9, 0.01)
                 .quantile(0.99, 0.01)
                 .quantile(0.999, 0.01)
                 .labelNames(metricsLabelNames)
-                .register(functionCollectorRegistry);
+                .register(collectorRegistry);
+
+        statlastInvocation = Gauge.build()
+                .name(PULSAR_FUNCTION_LAST_INVOCATION)
+                .help("The timestamp of the last invocation of the function")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+
+        statTotalRecordsRecieved = Counter.build()
+                .name(PULSAR_FUNCTION_RECEIVED_TOTAL)
+                .help("Total number of messages received from source.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
     }
 
     public void addUserException(Exception ex) {
@@ -120,8 +143,9 @@ public void reset() {
         statTotalSysExceptions.clear();
         statTotalUserExceptions.clear();
         statProcessLatency.clear();
+        statlastInvocation.clear();
+        statTotalRecordsRecieved.clear();
         latestUserExceptions.clear();
         latestSystemExceptions.clear();
-        lastInvocationTime = 0;
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 040af91241..51ec0d488c 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -41,6 +41,7 @@
     private FunctionDetails functionDetails;
     private int maxBufferedTuples;
     private int port;
+    private String clusterName;
 
     /**
      * Get the string representation of {@link #getInstanceId()}.
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 9e5ae51807..ab900ac40c 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,6 +22,7 @@
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
+import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Summary;
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -45,6 +46,8 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.functions.ConsumerConfig;
+import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
@@ -58,8 +61,6 @@
 import org.apache.pulsar.functions.sink.PulsarSinkDisable;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.common.functions.ConsumerConfig;
-import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.StateUtils;
@@ -116,6 +117,8 @@
     private Sink sink;
 
     private final SecretsProvider secretsProvider;
+
+    private CollectorRegistry collectorRegistry;
     private final String[] metricsLabels;
 
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
@@ -123,19 +126,25 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 String jarFile,
                                 PulsarClient pulsarClient,
                                 String stateStorageServiceUrl,
-                                SecretsProvider secretsProvider) {
+                                SecretsProvider secretsProvider,
+                                CollectorRegistry collectorRegistry) {
         this.instanceConfig = instanceConfig;
         this.fnCache = fnCache;
         this.jarFile = jarFile;
         this.client = (PulsarClientImpl) pulsarClient;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
-        this.stats = new FunctionStats();
+        this.stats = new FunctionStats(collectorRegistry);
         this.secretsProvider = secretsProvider;
+        this.collectorRegistry = collectorRegistry;
         this.metricsLabels = new String[]{
                 instanceConfig.getFunctionDetails().getTenant(),
-                instanceConfig.getFunctionDetails().getNamespace(),
-                instanceConfig.getFunctionDetails().getName(),
-                String.valueOf(instanceConfig.getInstanceId())
+                String.format("%s/%s", 
instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace()),
+                String.format("%s/%s/%s", 
instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace(),
+                        instanceConfig.getFunctionDetails().getName()),
+                String.valueOf(instanceConfig.getInstanceId()),
+                instanceConfig.getClusterName()
         };
     }
 
@@ -181,7 +190,7 @@ ContextImpl setupContext() {
         }
         Logger instanceLog = LoggerFactory.getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
-        return new ContextImpl(instanceConfig, instanceLog, client, 
inputTopics, secretsProvider);
+        return new ContextImpl(instanceConfig, instanceLog, client, 
inputTopics, secretsProvider, collectorRegistry, metricsLabels);
     }
 
     /**
@@ -201,6 +210,9 @@ public void run() {
             while (true) {
                 currentRecord = readInput();
 
+                // increment number of records received from source
+                stats.statTotalRecordsRecieved.labels(metricsLabels).inc();
+
                 if 
(instanceConfig.getFunctionDetails().getProcessingGuarantees() == 
org.apache.pulsar.functions
                         .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
                     if (instanceConfig.getFunctionDetails().getAutoAck()) {
@@ -212,7 +224,7 @@ public void run() {
                 JavaExecutionResult result;
 
                 // set last invocation time
-                stats.setLastInvocationTime(System.currentTimeMillis());
+                
stats.statlastInvocation.labels(metricsLabels).set(System.currentTimeMillis());
 
                 // start time for process latency stat
                 Summary.Timer requestTimer = 
stats.statProcessLatency.labels(metricsLabels).startTimer();
@@ -331,7 +343,6 @@ private void processResult(Record srcRecord,
             stats.addUserException(result.getUserException() );
             srcRecord.fail();
         } else {
-            stats.statTotalProcessedSuccessfully.labels(metricsLabels).inc();
             if (result.getResult() != null) {
                 sendOutputMessage(srcRecord, result.getResult());
             } else {
@@ -340,6 +351,8 @@ private void processResult(Record srcRecord,
                     srcRecord.ack();
                 }
             }
+            // increment total successfully processed
+            stats.statTotalProcessedSuccessfully.labels(metricsLabels).inc();
         }
     }
 
@@ -439,22 +452,24 @@ public void resetMetrics() {
 
     private Builder createMetricsDataBuilder() {
         InstanceCommunication.MetricsData.Builder bldr = 
InstanceCommunication.MetricsData.newBuilder();
-        addSystemMetrics("__total_processed__", 
stats.statTotalProcessed.labels(metricsLabels).get(), bldr);
-        addSystemMetrics("__total_successfully_processed__", 
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr);
-        addSystemMetrics("__total_system_exceptions__",  
stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr);
-        addSystemMetrics("__total_user_exceptions__", 
stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr);
-        addSystemMetrics("__avg_latency_ms__",
+        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_TOTAL, 
stats.statTotalProcessed.labels(metricsLabels).get(), bldr);
+        
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL, 
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr);
+        
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL,  
stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr);
+        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL, 
stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr);
+        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_RECEIVED_TOTAL, 
stats.statTotalRecordsRecieved.labels(metricsLabels).get(), bldr);
+        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESS_LATENCY_MS,
                 stats.statProcessLatency.labels(metricsLabels).get().count <= 
0.0
                         ? 0 : 
stats.statProcessLatency.labels(metricsLabels).get().sum / 
stats.statProcessLatency.labels(metricsLabels).get().count,
                 bldr);
+        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_LAST_INVOCATION, 
stats.statlastInvocation.labels(metricsLabels).get(), bldr);
         return bldr;
     }
 
     public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
         InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = 
InstanceCommunication.FunctionStatus.newBuilder();
-        
functionStatusBuilder.setNumProcessed((long)stats.statTotalProcessed.labels(metricsLabels).get());
-        
functionStatusBuilder.setNumSuccessfullyProcessed((long)stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
-        
functionStatusBuilder.setNumUserExceptions((long)stats.statTotalUserExceptions.labels(metricsLabels).get());
+        functionStatusBuilder.setNumProcessed((long) 
stats.statTotalProcessed.labels(metricsLabels).get());
+        functionStatusBuilder.setNumSuccessfullyProcessed((long) 
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
+        functionStatusBuilder.setNumUserExceptions((long) 
stats.statTotalUserExceptions.labels(metricsLabels).get());
         stats.getLatestUserExceptions().forEach(ex -> {
             functionStatusBuilder.addLatestUserExceptions(ex);
         });
@@ -464,8 +479,9 @@ private Builder createMetricsDataBuilder() {
         });
         functionStatusBuilder.setAverageLatency(
                 stats.statProcessLatency.labels(metricsLabels).get().count == 
0.0
-                        ? 0 : 
stats.statProcessLatency.labels(metricsLabels).get().sum / 
stats.statProcessLatency.labels(metricsLabels).get().count);
-        
functionStatusBuilder.setLastInvocationTime(stats.getLastInvocationTime());
+                        ? 0 : 
stats.statProcessLatency.labels(metricsLabels).get().sum / 
stats.statProcessLatency
+                        .labels(metricsLabels).get().count);
+        functionStatusBuilder.setLastInvocationTime((long) 
stats.statlastInvocation.labels(metricsLabels).get());
         return functionStatusBuilder;
     }
 
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py 
b/pulsar-functions/instance/src/main/python/contextimpl.py
index 83a63aa98a..9a37d59ab1 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -31,6 +31,9 @@
 import util
 import InstanceCommunication_pb2
 
+from prometheus_client import Summary
+from function_stats import Stats
+
 # For keeping track of accumulated metrics
 class AccumulatedMetricDatum(object):
   def __init__(self):
@@ -39,7 +42,7 @@ def __init__(self):
     self.max = float('-inf')
     self.min = float('inf')
 
-  def record(self, value):
+  def update(self, value):
     self.count += 1
     self.sum += value
     if value > self.max:
@@ -48,14 +51,17 @@ def record(self, value):
       self.min = value
 
 class ContextImpl(pulsar.Context):
-  def __init__(self, instance_config, logger, pulsar_client, user_code, 
consumers, secrets_provider):
+
+  # add label to indicate user metric
+  user_metrics_label_names = Stats.metrics_label_names + ["metric"]
+
+  def __init__(self, instance_config, logger, pulsar_client, user_code, 
consumers, secrets_provider, metrics_labels):
     self.instance_config = instance_config
     self.log = logger
     self.pulsar_client = pulsar_client
     self.user_code_dir = os.path.dirname(user_code)
     self.consumers = consumers
     self.secrets_provider = secrets_provider
-    self.current_accumulated_metrics = {}
     self.accumulated_metrics = {}
     self.publish_producers = {}
     self.publish_serializers = {}
@@ -69,6 +75,12 @@ def __init__(self, instance_config, logger, pulsar_client, 
user_code, consumers,
       if instance_config.function_details.secretsMap \
       else {}
 
+    self.metrics_labels = metrics_labels
+    self.user_metrics_labels = dict()
+    self.user_metrics_summary = Summary("pulsar_function_user_metric",
+                                    'Pulsar Function user defined metric',
+                                        ContextImpl.user_metrics_label_names)
+
   # Called on a per message basis to set the context for the current message
   def set_current_message_context(self, msgid, topic):
     self.current_message_id = msgid
@@ -117,9 +129,12 @@ def get_secret(self, secret_key):
     return self.secrets_provider.provide_secret(secret_key, 
self.secrets_map[secret_key])
 
   def record_metric(self, metric_name, metric_value):
-    if not metric_name in self.current_accumulated_metrics:
-      self.current_accumulated_metrics[metric_name] = AccumulatedMetricDatum()
-    self.current_accumulated_metrics[metric_name].update(metric_value)
+    if metric_name not in self.user_metrics_labels:
+      self.user_metrics_labels[metric_name] = self.metrics_labels + 
[metric_name]
+    
self.user_metrics_summary.labels(*self.user_metrics_labels[metric_name]).observe(metric_value)
+    if not metric_name in self.accumulated_metrics:
+      self.accumulated_metrics[metric_name] = AccumulatedMetricDatum()
+    self.accumulated_metrics[metric_name].update(metric_value)
 
   def get_output_topic(self):
     return self.instance_config.function_details.output
@@ -164,17 +179,16 @@ def get_and_reset_metrics(self):
 
   def reset_metrics(self):
     # TODO: Make it thread safe
+    for labels in self.user_metrics_labels.values():
+      self.user_metrics_summary.labels(*labels)._sum.set(0.0)
+      self.user_metrics_summary.labels(*labels)._count.set(0.0)
     self.accumulated_metrics.clear()
-    self.accumulated_metrics.update(self.current_accumulated_metrics)
-    self.current_accumulated_metrics.clear()
 
   def get_metrics(self):
     metrics = InstanceCommunication_pb2.MetricsData()
     for metric_name, accumulated_metric in self.accumulated_metrics.items():
-      m = InstanceCommunication_pb2.MetricsData.DataDigest()
-      m.count = accumulated_metric.count
-      m.sum = accumulated_metric.sum
-      m.max = accumulated_metric.max
-      m.min = accumulated_metric.min
-      metrics.metrics[metric_name] = m
+      metrics.metrics[metric_name].count = accumulated_metric.count
+      metrics.metrics[metric_name].sum = accumulated_metric.sum
+      metrics.metrics[metric_name].max = accumulated_metric.max
+      metrics.metrics[metric_name].min = accumulated_metric.min
     return metrics
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py 
b/pulsar-functions/instance/src/main/python/function_stats.py
new file mode 100644
index 0000000000..13b3f8442f
--- /dev/null
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import traceback
+import time
+
+from prometheus_client import Counter, Summary, Gauge
+
+# We keep track of the following metrics
+class Stats(object):
+  metrics_label_names = ['tenant', 'namespace', 'function', 'instance_id', 
'cluster']
+
+  TOTAL_PROCESSED = 'pulsar_function_processed_total'
+  TOTAL_SUCCESSFULLY_PROCESSED = 'pulsar_function_processed_successfully_total'
+  TOTAL_SYSTEM_EXCEPTIONS = 'pulsar_function_system_exceptions_total'
+  TOTAL_USER_EXCEPTIONS = 'pulsar_function_user_exceptions_total'
+  PROCESS_LATENCY_MS = 'pulsar_function_process_latency_ms'
+  LAST_INVOCATION = 'pulsar_function_last_invocation'
+  TOTAL_RECEIVED = 'pulsar_function_received_total'
+
+  # Declare Prometheus
+  stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages 
processed.', metrics_label_names)
+  stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED,
+                                              'Total number of messages 
processed successfully.', metrics_label_names)
+  stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number 
of system exceptions.',
+                                      metrics_label_names)
+  stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of 
user exceptions.',
+                                       metrics_label_names)
+
+  stat_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in 
milliseconds.', metrics_label_names)
+
+  stat_last_invocation = Gauge(LAST_INVOCATION, 'The timestamp of the last 
invocation of the function.', metrics_label_names)
+
+  stat_total_received = Counter(TOTAL_RECEIVED, 'Total number of messages 
received from source.', metrics_label_names)
+
+  latest_user_exception = []
+  latest_sys_exception = []
+
+  def add_user_exception(self):
+    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() 
* 1000)))
+    if len(self.latest_sys_exception) > 10:
+      self.latest_sys_exception.pop(0)
+
+  def add_sys_exception(self):
+    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() 
* 1000)))
+    if len(self.latest_sys_exception) > 10:
+      self.latest_sys_exception.pop(0)
+
+  def reset(self, metrics_labels):
+    self.latest_user_exception = []
+    self.latest_sys_exception = []
+    self.stat_total_processed.labels(*metrics_labels)._value.set(0.0)
+    
self.stat_total_processed_successfully.labels(*metrics_labels)._value.set(0.0)
+    self.stat_total_user_exceptions.labels(*metrics_labels)._value.set(0.0)
+    self.stat_total_sys_exceptions.labels(*metrics_labels)._value.set(0.0)
+    self.stat_process_latency_ms.labels(*metrics_labels)._sum.set(0.0)
+    self.stat_process_latency_ms.labels(*metrics_labels)._count.set(0.0)
+    self.stat_last_invocation.labels(*metrics_labels).set(0.0)
+    self.stat_total_received.labels(*metrics_labels)._value.set(0.0)
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 6f74461e85..1036b23132 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -31,14 +31,8 @@
 except:
   import queue
 import threading
-from functools import partial
-from collections import namedtuple
-from threading import Timer
-from prometheus_client import Counter, Summary
-import traceback
 import sys
 import re
-
 import pulsar
 import contextimpl
 import Function_pb2
@@ -46,6 +40,11 @@
 import util
 import InstanceCommunication_pb2
 
+from functools import partial
+from collections import namedtuple
+from threading import Timer
+from function_stats import Stats
+
 Log = log.Log
 # Equivalent of the InstanceConfig in Java
 InstanceConfig = namedtuple('InstanceConfig', 'instance_id function_id 
function_version function_details max_buffered_tuples')
@@ -68,55 +67,9 @@ def base64ify(bytes_or_str):
     else:
         return output_bytes
 
-# We keep track of the following metrics
-class Stats(object):
-  metrics_label_names = ['tenant', 'namespace', 'name', 'instance_id']
-
-  TOTAL_PROCESSED = '__function_total_processed__'
-  TOTAL_SUCCESSFULLY_PROCESSED = '__function_total_successfully_processed__'
-  TOTAL_SYSTEM_EXCEPTIONS = '__function_total_system_exceptions__'
-  TOTAL_USER_EXCEPTIONS = '__function_total_user_exceptions__'
-  PROCESS_LATENCY_MS = '__function_process_latency_ms__'
-
-  # Declare Prometheus
-  stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages 
processed.', metrics_label_names)
-  stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED,
-                                              'Total number of messages 
processed successfully.', metrics_label_names)
-  stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number 
of system exceptions.',
-                                      metrics_label_names)
-  stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of 
user exceptions.',
-                                       metrics_label_names)
-
-  stats_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in 
milliseconds.', metrics_label_names)
-
-  latest_user_exception = []
-  latest_sys_exception = []
-
-  last_invocation_time = 0.0
-
-  def add_user_exception(self):
-    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() 
* 1000)))
-    if len(self.latest_sys_exception) > 10:
-      self.latest_sys_exception.pop(0)
-
-  def add_sys_exception(self):
-    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() 
* 1000)))
-    if len(self.latest_sys_exception) > 10:
-      self.latest_sys_exception.pop(0)
-
-  def reset(self, metrics_labels):
-    self.latest_user_exception = []
-    self.latest_sys_exception = []
-    self.stat_total_processed.labels(*metrics_labels)._value.set(0.0)
-    
self.stat_total_processed_successfully.labels(*metrics_labels)._value.set(0.0)
-    self.stat_total_user_exceptions.labels(*metrics_labels)._value.set(0.0)
-    self.stat_total_sys_exceptions.labels(*metrics_labels)._value.set(0.0)
-    self.stats_process_latency_ms.labels(*metrics_labels)._sum.set(0)
-    self.stats_process_latency_ms.labels(*metrics_labels)._count.set(0);
-    self.last_invocation_time = 0.0
-
 class PythonInstance(object):
-  def __init__(self, instance_id, function_id, function_version, 
function_details, max_buffered_tuples, expected_healthcheck_interval, 
user_code, pulsar_client, secrets_provider):
+  def __init__(self, instance_id, function_id, function_version, 
function_details, max_buffered_tuples,
+               expected_healthcheck_interval, user_code, pulsar_client, 
secrets_provider, cluster_name):
     self.instance_config = InstanceConfig(instance_id, function_id, 
function_version, function_details, max_buffered_tuples)
     self.user_code = user_code
     self.queue = queue.Queue(max_buffered_tuples)
@@ -140,7 +93,10 @@ def __init__(self, instance_id, function_id, 
function_version, function_details,
     self.timeout_ms = function_details.source.timeoutMs if 
function_details.source.timeoutMs > 0 else None
     self.expected_healthcheck_interval = expected_healthcheck_interval
     self.secrets_provider = secrets_provider
-    self.metrics_labels = [function_details.tenant, 
function_details.namespace, function_details.name, instance_id]
+    self.metrics_labels = [function_details.tenant,
+                           "%s/%s" % (function_details.tenant, 
function_details.namespace),
+                           "%s/%s/%s" % (function_details.tenant, 
function_details.namespace, function_details.name),
+                           instance_id, cluster_name]
 
   def health_check(self):
     self.last_health_check_ts = time.time()
@@ -210,7 +166,9 @@ def run(self):
     except:
       self.function_purefunction = function_kclass
 
-    self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, 
self.pulsar_client, self.user_code, self.consumers, self.secrets_provider)
+    self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, 
self.pulsar_client,
+                                               self.user_code, self.consumers,
+                                               self.secrets_provider, 
self.metrics_labels)
     # Now launch a thread that does execution
     self.execution_thread = threading.Thread(target=self.actual_execution)
     self.execution_thread.start()
@@ -242,13 +200,13 @@ def actual_execution(self):
         try:
           # get user function start time for statistic calculation
           start_time = time.time()
-          self.stats.last_invocation_time = start_time * 1000.0
+          
Stats.stat_last_invocation.labels(*self.metrics_labels).set(start_time * 1000.0)
           if self.function_class is not None:
             output_object = self.function_class.process(input_object, 
self.contextimpl)
           else:
             output_object = self.function_purefunction.process(input_object)
           successfully_executed = True
-          
Stats.stats_process_latency_ms.labels(*self.metrics_labels).observe((time.time()
 - start_time) * 1000.0)
+          
Stats.stat_process_latency_ms.labels(*self.metrics_labels).observe((time.time() 
- start_time) * 1000.0)
           Stats.stat_total_processed.labels(*self.metrics_labels).inc()
         except Exception as e:
           Log.exception("Exception while executing user method")
@@ -309,6 +267,8 @@ def setup_producer(self):
         max_pending_messages=100000)
 
   def message_listener(self, serde, consumer, message):
+    # increment number of received records from source
+    Stats.stat_total_received.labels(*self.metrics_labels).inc()
     item = InternalMessage(message, consumer.topic(), serde, consumer)
     self.queue.put(item, True)
     if self.atmost_once and self.auto_ack:
@@ -328,14 +288,16 @@ def get_metrics(self):
     # First get any user metrics
     metrics = self.contextimpl.get_metrics()
     # Now add system metrics as well
-    self.add_system_metrics("__total_processed__", 
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics)
-    self.add_system_metrics("__total_successfully_processed__", 
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(),
 metrics)
-    self.add_system_metrics("__total_system_exceptions__", 
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get(), 
metrics)
-    self.add_system_metrics("__total_user_exceptions__", 
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get(), 
metrics)
-    self.add_system_metrics("__avg_latency_ms__",
-                            0.0 if 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0
-                            else 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() / 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get(),
+    self.add_system_metrics(Stats.TOTAL_PROCESSED, 
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics)
+    self.add_system_metrics(Stats.TOTAL_SUCCESSFULLY_PROCESSED, 
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(),
 metrics)
+    self.add_system_metrics(Stats.TOTAL_SYSTEM_EXCEPTIONS, 
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get(), 
metrics)
+    self.add_system_metrics(Stats.TOTAL_USER_EXCEPTIONS, 
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get(), 
metrics)
+    self.add_system_metrics(Stats.PROCESS_LATENCY_MS,
+                            0.0 if 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0
+                            else 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get() / 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get(),
                             metrics)
+    self.add_system_metrics(Stats.TOTAL_RECEIVED, 
Stats.stat_total_received.labels(*self.metrics_labels)._value.get(), metrics)
+    self.add_system_metrics(Stats.LAST_INVOCATION, 
Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get(), metrics)
     return metrics
 
   def add_system_metrics(self, metric_name, value, metrics):
@@ -347,23 +309,32 @@ def add_system_metrics(self, metric_name, value, metrics):
   def get_function_status(self):
     status = InstanceCommunication_pb2.FunctionStatus()
     status.running = True
-    status.numProcessed = 
long(Stats.stat_total_processed.labels(*self.metrics_labels)._value.get())
-    status.numSuccessfullyProcessed = 
long(Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get())
-    status.numUserExceptions = 
long(Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get())
+
+    total_processed = 
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get()
+    stat_total_processed_successfully = 
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get()
+    stat_total_user_exceptions = 
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get()
+    stat_total_sys_exceptions = 
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get()
+    stat_process_latency_ms_count = 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
+    stat_process_latency_ms_sum = 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
+    stat_last_invocation = 
Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get()
+
+    status.numProcessed =  int(total_processed) if sys.version_info.major >= 3 
else long(total_processed)
+    status.numSuccessfullyProcessed = int(stat_total_processed_successfully) 
if sys.version_info.major >= 3 else long(stat_total_processed_successfully)
+    status.numUserExceptions = int(stat_total_user_exceptions) if 
sys.version_info.major >= 3 else long(stat_total_user_exceptions)
     status.instanceId = self.instance_config.instance_id
     for ex, tm in self.stats.latest_user_exception:
       to_add = status.latestUserExceptions.add()
       to_add.exceptionString = ex
       to_add.msSinceEpoch = tm
-    status.numSystemExceptions = 
long(Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get())
+    status.numSystemExceptions = int(stat_total_sys_exceptions) if 
sys.version_info.major >= 3 else long(stat_total_sys_exceptions)
     for ex, tm in self.stats.latest_sys_exception:
       to_add = status.latestSystemExceptions.add()
       to_add.exceptionString = ex
       to_add.msSinceEpoch = tm
     status.averageLatency = 0.0 \
-      if 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0 
\
-      else 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() / 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get()
-    status.lastInvocationTime = long(self.stats.last_invocation_time)
+      if stat_process_latency_ms_count <= 0.0 \
+      else stat_process_latency_ms_sum / stat_process_latency_ms_count
+    status.lastInvocationTime = int(stat_last_invocation) if 
sys.version_info.major >= 3 else long(stat_last_invocation)
     return status
 
   def join(self):
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index d2562697f3..2a023806a5 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -39,6 +39,8 @@
 import server
 import python_instance
 import util
+import prometheus_client
+
 from google.protobuf import json_format
 
 to_run = True
@@ -69,6 +71,7 @@ def main():
   parser.add_argument('--hostname_verification_enabled', required=False, 
help='Enable hostname verification')
   parser.add_argument('--tls_trust_cert_path', required=False, help='Tls trust 
cert file path')
   parser.add_argument('--port', required=True, help='Instance Port', type=int)
+  parser.add_argument('--metrics_port', required=True, help="Port metrics will 
be exposed on", type=int)
   parser.add_argument('--max_buffered_tuples', required=True, help='Maximum 
number of Buffered tuples')
   parser.add_argument('--logging_directory', required=True, help='Logging 
Directory')
   parser.add_argument('--logging_file', required=True, help='Log file name')
@@ -79,6 +82,7 @@ def main():
   parser.add_argument('--install_usercode_dependencies', required=False, 
help='For packaged python like wheel files, do we need to install all 
dependencies', type=bool)
   parser.add_argument('--dependency_repository', required=False, help='For 
packaged python like wheel files, which repository to pull the dependencies 
from')
   parser.add_argument('--extra_dependency_repository', required=False, 
help='For packaged python like wheel files, any extra repository to pull the 
dependencies from')
+  parser.add_argument('--cluster_name', required=True, help='The name of the 
cluster this instance is running on')
 
   args = parser.parse_args()
   function_details = Function_pb2.FunctionDetails()
@@ -166,10 +170,12 @@ def main():
                                               str(args.function_version), 
function_details,
                                               int(args.max_buffered_tuples),
                                               
int(args.expected_healthcheck_interval),
-                                              str(args.py), pulsar_client, 
secrets_provider)
+                                              str(args.py), pulsar_client, 
secrets_provider, args.cluster_name)
   pyinstance.run()
   server_instance = server.serve(args.port, pyinstance)
 
+  prometheus_client.start_http_server(args.metrics_port)
+
   global to_run
   while to_run:
     time.sleep(1)
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index e3e32fda3e..0ac3502fd6 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -30,6 +30,7 @@
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import io.prometheus.client.CollectorRegistry;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
@@ -75,7 +76,7 @@ public void setup() {
             logger,
             client,
             new ArrayList<>(),
-            new EnvironmentBasedSecretsProvider()
+            new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), 
new String[0]
         );
     }
 
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 80b3b1da5a..b927c49d87 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -56,7 +56,7 @@ private static InstanceConfig createInstanceConfig(boolean 
addCustom, String out
     private JavaInstanceRunnable createRunnable(boolean addCustom, String 
outputSerde) throws Exception {
         InstanceConfig config = createInstanceConfig(addCustom, outputSerde);
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                config, null, null, null, null, null);
+                config, null, null, null, null, null, null);
         return javaInstanceRunnable;
     }
 
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py 
b/pulsar-functions/instance/src/test/python/test_python_instance.py
index 748e5d80a8..b865a9df82 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_python_instance.py
@@ -48,7 +48,7 @@ def test_context_publish(self):
     pulsar_client.create_producer = Mock(return_value=producer)
     user_code=__file__
     consumers = None
-    context_impl = ContextImpl(instance_config, logger, pulsar_client, 
user_code, consumers, None)
+    context_impl = ContextImpl(instance_config, logger, pulsar_client, 
user_code, consumers, None, None)
 
     context_impl.publish("test_topic_name", "test_message")
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 98986dd566..d5fb80ee73 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -29,8 +29,17 @@
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.HTTPServer;
+import io.prometheus.client.hotspot.BufferPoolsExports;
+import io.prometheus.client.hotspot.ClassLoadingExports;
 import io.prometheus.client.hotspot.DefaultExports;
+import io.prometheus.client.hotspot.GarbageCollectorExports;
+import io.prometheus.client.hotspot.MemoryPoolsExports;
+import io.prometheus.client.hotspot.StandardExports;
+import io.prometheus.client.hotspot.ThreadExports;
+import io.prometheus.client.hotspot.VersionInfoExports;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -43,12 +52,14 @@
 import org.apache.pulsar.functions.utils.Reflections;
 
 import java.lang.reflect.Type;
+import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 /**
  * A function container implemented using java thread.
@@ -99,6 +110,9 @@
     @Parameter(names = "--port", description = "Port to listen on\n", required 
= true)
     protected int port;
 
+    @Parameter(names = "--metrics_port", description = "Port metrics will be 
exposed on\n", required = true)
+    protected int metrics_port;
+
     @Parameter(names = "--max_buffered_tuples", description = "Maximum number 
of tuples to buffer\n", required = true)
     protected int maxBufferedTuples;
 
@@ -111,11 +125,15 @@
     @Parameter(names = "--secrets_provider_config", description = "The config 
that needs to be passed to secrets provider", required = false)
     protected String secretsProviderConfig;
 
+    @Parameter(names = "--cluster_name", description = "The name of the 
cluster this instance is running on", required = true)
+    protected String clusterName;
+
     private Server server;
     private RuntimeSpawner runtimeSpawner;
     private ThreadRuntimeFactory containerFactory;
     private Long lastHealthCheckTs = null;
     private ScheduledExecutorService timer;
+    private HTTPServer metricsServer;
 
     public JavaInstanceMain() { }
 
@@ -126,6 +144,7 @@ public void start() throws Exception {
         instanceConfig.setFunctionVersion(functionVersion);
         instanceConfig.setInstanceId(instanceId);
         instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
+        instanceConfig.setClusterName(clusterName);
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
         if (functionDetailsJsonString.charAt(0) == '\'') {
             functionDetailsJsonString = functionDetailsJsonString.substring(1);
@@ -162,6 +181,9 @@ public void start() throws Exception {
         }
         secretsProvider.init(secretsProviderConfigMap);
 
+        // Collector Registry for prometheus metrics
+        CollectorRegistry collectorRegistry = new CollectorRegistry();
+
         containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", 
pulsarServiceUrl,
                 stateStorageServiceUrl,
                 
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin)
@@ -169,7 +191,7 @@ public void start() throws Exception {
                         
.tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
                         
.tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
                         .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
-                secretsProvider);
+                secretsProvider, collectorRegistry);
         runtimeSpawner = new RuntimeSpawner(
                 instanceConfig,
                 jarFile,
@@ -195,12 +217,13 @@ public void run() {
             }
         });
 
-        // registering jvm metrics to prometheus
-        DefaultExports.initialize();
-
         log.info("Starting runtimeSpawner");
         runtimeSpawner.start();
 
+        // starting metrics server
+        log.info("Starting metrics server on port {}", metrics_port);
+        metricsServer = new HTTPServer(new InetSocketAddress(metrics_port), 
collectorRegistry, true);
+
         if (expectedHealthCheckInterval > 0) {
             timer = Executors.newSingleThreadScheduledExecutor();
             timer.scheduleAtFixedRate(new TimerTask() {
@@ -253,6 +276,9 @@ public void close() {
             if (containerFactory != null) {
                 containerFactory.close();
             }
+            if (metricsServer != null) {
+                metricsServer.stop();
+            }
         } catch (Exception ex) {
             System.err.println(ex);
         }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 6e716f21dc..4dc97ace14 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -88,9 +88,7 @@
     private static final String ENV_SHARD_ID = "SHARD_ID";
     private static final int maxJobNameSize = 55;
     private static final Integer GRPC_PORT = 9093;
-    private static final Integer PROMETHEUS_PORT = 9094;
-    private static final Double prometheusMetricsServerCpu = 0.1;
-    private static final Long prometheusMetricsServerRam = 125000000l;
+    private static final Integer METRICS_PORT = 9094;
     public static final Pattern VALID_POD_NAME_REGEX =
             
Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
                     Pattern.CASE_INSENSITIVE);
@@ -110,7 +108,6 @@
     // The thread that invokes the function
     @Getter
     private List<String> processArgs;
-    private List<String> prometheusMetricsServerArgs;
     @Getter
     private ManagedChannel[] channel;
     private InstanceControlGrpc.InstanceControlFutureStub[] stub;
@@ -190,8 +187,8 @@
             secretsProviderConfig,
             installUserCodeDependencies,
             pythonDependencyRepository,
-            pythonExtraDependencyRepository);
-        this.prometheusMetricsServerArgs = 
composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, 
expectedMetricsInterval);
+            pythonExtraDependencyRepository,
+                METRICS_PORT);
         running = false;
         doChecks(instanceConfig.getFunctionDetails());
     }
@@ -453,14 +450,6 @@ public void deleteService() throws Exception {
         );
     }
 
-    protected List<String> getPrometheusMetricsServerCommand() {
-        return Arrays.asList(
-                "sh",
-                "-c",
-                String.join(" ", prometheusMetricsServerArgs)
-        );
-    }
-
     private List<String> getDownloadCommand(String bkPath, String 
userCodeFilePath) {
         return Arrays.asList(
                 pulsarRootDir + "/bin/pulsar-admin",
@@ -525,15 +514,16 @@ private V1StatefulSet createStatefulSet() {
     private Map<String, String> getPrometheusAnnotations() {
         final Map<String, String> annotations = new HashMap<>();
         annotations.put("prometheus.io/scrape", "true");
-        annotations.put("prometheus.io/port", String.valueOf(PROMETHEUS_PORT));
+        annotations.put("prometheus.io/port", String.valueOf(METRICS_PORT));
         return annotations;
     }
 
     private Map<String, String> getLabels(Function.FunctionDetails 
functionDetails) {
         final Map<String, String> labels = new HashMap<>();
-        labels.put("app", createJobName(functionDetails));
-        labels.put("namespace", functionDetails.getNamespace());
+        labels.put("namespace", 
String.format("%s/%s",functionDetails.getTenant(), 
functionDetails.getNamespace()));
         labels.put("tenant", functionDetails.getTenant());
+        labels.put("function", String.format("%s/%s/%s", 
functionDetails.getTenant(),
+                functionDetails.getNamespace(), functionDetails.getName()));
         if (customLabels != null && !customLabels.isEmpty()) {
             labels.putAll(customLabels);
         }
@@ -552,7 +542,6 @@ private V1PodSpec getPodSpec(List<String> instanceCommand, 
Function.Resources re
 
         List<V1Container> containers = new LinkedList<>();
         containers.add(getFunctionContainer(instanceCommand, resource));
-        containers.add(getPrometheusContainer());
         podSpec.containers(containers);
 
         // Configure secrets
@@ -607,38 +596,6 @@ private V1Container getFunctionContainer(List<String> 
instanceCommand, Function.
         return container;
     }
 
-    private V1Container getPrometheusContainer() {
-        final V1Container container = new 
V1Container().name("prometheusmetricsserver");
-
-        // set up the container images
-        container.setImage(pulsarDockerImageName);
-
-        // set up the container command
-        container.setCommand(getPrometheusMetricsServerCommand());
-
-        // setup the environment variables for the container
-        final V1EnvVar envVarPodName = new V1EnvVar();
-        envVarPodName.name("POD_NAME")
-                .valueFrom(new V1EnvVarSource()
-                        .fieldRef(new V1ObjectFieldSelector()
-                                .fieldPath("metadata.name")));
-        container.setEnv(Arrays.asList(envVarPodName));
-
-
-        // set container resources
-        final V1ResourceRequirements resourceRequirements = new 
V1ResourceRequirements();
-        final Map<String, Quantity> requests = new HashMap<>();
-        requests.put("memory", 
Quantity.fromString(Long.toString(prometheusMetricsServerRam)));
-        requests.put("cpu", 
Quantity.fromString(Double.toString(prometheusMetricsServerCpu)));
-        resourceRequirements.setRequests(requests);
-        container.setResources(resourceRequirements);
-
-        // set container ports
-        container.setPorts(getPrometheusContainerPorts());
-
-        return container;
-    }
-
     private List<V1ContainerPort> getFunctionContainerPorts() {
         List<V1ContainerPort> ports = new ArrayList<>();
         final V1ContainerPort port = new V1ContainerPort();
@@ -652,7 +609,7 @@ private V1Container getPrometheusContainer() {
         List<V1ContainerPort> ports = new ArrayList<>();
         final V1ContainerPort port = new V1ContainerPort();
         port.setName("prometheus");
-        port.setContainerPort(PROMETHEUS_PORT);
+        port.setContainerPort(METRICS_PORT);
         ports.add(port);
         return ports;
     }
@@ -680,24 +637,4 @@ public static void doChecks(Function.FunctionDetails 
functionDetails) {
             throw new RuntimeException("Kubernetes job name size should be 
less than " + maxJobNameSize);
         }
     }
-
-    private List<String> composePrometheusMetricsServerArgs(String 
prometheusMetricsServerFile,
-                                                            Integer 
expectedMetricsInterval) throws Exception {
-        List<String> args = new LinkedList<>();
-        args.add("java");
-        args.add("-cp");
-        args.add(prometheusMetricsServerFile);
-        
args.add("-Dlog4j.configurationFile=prometheus_metricsserver_log4j2.yml");
-        args.add("-Xmx" + String.valueOf(prometheusMetricsServerRam));
-        args.add(PrometheusMetricsServer.class.getName());
-        args.add("--function_details");
-        args.add("'" + 
JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails())
 + "'");
-        args.add("--prometheus_port");
-        args.add(String.valueOf(PROMETHEUS_PORT));
-        args.add("--grpc_port");
-        args.add(String.valueOf(GRPC_PORT));
-        args.add("--collection_interval");
-        args.add(String.valueOf(expectedMetricsInterval));
-        return args;
-    }
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index e85a45d593..733be37dd5 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -189,6 +189,7 @@ protected static void 
startLocalRun(org.apache.pulsar.functions.proto.Function.F
                 instanceConfig.setInstanceId(i + instanceIdOffset);
                 instanceConfig.setMaxBufferedTuples(1024);
                 instanceConfig.setPort(Utils.findAvailablePort());
+                instanceConfig.setClusterName("local");
                 RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
                         instanceConfig,
                         userCodeFile,
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 0e4319f561..9ac20ab80d 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -36,6 +36,7 @@
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Utils;
 
 import java.io.InputStream;
 import java.util.List;
@@ -117,7 +118,8 @@
             secretsProviderConfig,
             false,
             null,
-            null);
+            null,
+                Utils.findAvailablePort());
     }
 
     /**
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index e84ec203fe..05f21e614c 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -57,7 +57,8 @@
                                            String secretsProviderConfig,
                                            Boolean 
installUserCodeDepdendencies,
                                            String pythonDependencyRepository,
-                                           String 
pythonExtraDependencyRepository) throws Exception {
+                                           String 
pythonExtraDependencyRepository,
+                                           int metricsPort) throws Exception {
         List<String> args = new LinkedList<>();
         if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
             args.add("java");
@@ -160,6 +161,9 @@
         args.add("--port");
         args.add(String.valueOf(grpcPort));
 
+        args.add("--metrics_port");
+        args.add(String.valueOf(metricsPort));
+
         // state storage configs
         if (null != stateStorageServiceUrl
                 && instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
@@ -177,6 +181,9 @@
                 args.add("'" + secretsProviderConfig + "'");
             }
         }
+
+        args.add("--cluster_name");
+        args.add(instanceConfig.getClusterName());
         return args;
     }
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 5e42c52ea2..913d4ae2a8 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -21,6 +21,7 @@
 
 import java.util.concurrent.CompletableFuture;
 
+import io.prometheus.client.CollectorRegistry;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -53,18 +54,28 @@
                   String jarFile,
                   PulsarClient pulsarClient,
                   String stateStorageServiceUrl,
-                  SecretsProvider secretsProvider) {
+                  SecretsProvider secretsProvider,
+                  CollectorRegistry collectorRegistry) {
         this.instanceConfig = instanceConfig;
         if (instanceConfig.getFunctionDetails().getRuntime() != 
Function.FunctionDetails.Runtime.JAVA) {
             throw new RuntimeException("Thread Container only supports Java 
Runtime");
         }
+
+        // if collector registry is not set, create one for this thread.
+        // since each thread / instance will needs its own collector registry 
for metrics collection
+        CollectorRegistry instanceCollectorRegistry = collectorRegistry;
+        if (instanceCollectorRegistry == null) {
+            instanceCollectorRegistry = new CollectorRegistry();
+        }
+
         this.javaInstanceRunnable = new JavaInstanceRunnable(
             instanceConfig,
             fnCache,
             jarFile,
             pulsarClient,
             stateStorageServiceUrl,
-            secretsProvider);
+            secretsProvider,
+            instanceCollectorRegistry);
         this.threadGroup = threadGroup;
     }
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index 846028d40a..ed76117735 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -21,6 +21,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 
+import io.prometheus.client.CollectorRegistry;
 import lombok.extern.slf4j.Slf4j;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -45,21 +46,23 @@
     private final PulsarClient pulsarClient;
     private final String storageServiceUrl;
     private final SecretsProvider secretsProvider;
+    private final CollectorRegistry collectorRegistry;
     private volatile boolean closed;
 
     public ThreadRuntimeFactory(String threadGroupName, String 
pulsarServiceUrl, String storageServiceUrl,
-                                AuthenticationConfig authConfig, 
SecretsProvider secretsProvider) throws Exception {
-        this(threadGroupName, createPulsarClient(pulsarServiceUrl, 
authConfig), storageServiceUrl, secretsProvider);
+                                AuthenticationConfig authConfig, 
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry) throws 
Exception {
+        this(threadGroupName, createPulsarClient(pulsarServiceUrl, 
authConfig), storageServiceUrl, secretsProvider, collectorRegistry);
     }
 
     @VisibleForTesting
     public ThreadRuntimeFactory(String threadGroupName, PulsarClient 
pulsarClient, String storageServiceUrl,
-                                SecretsProvider secretsProvider) {
+                                SecretsProvider secretsProvider, 
CollectorRegistry collectorRegistry) {
         this.secretsProvider = secretsProvider;
         this.fnCache = new FunctionCacheManagerImpl();
         this.threadGroup = new ThreadGroup(threadGroupName);
         this.pulsarClient = pulsarClient;
         this.storageServiceUrl = storageServiceUrl;
+        this.collectorRegistry = collectorRegistry;
     }
 
     private static PulsarClient createPulsarClient(String pulsarServiceUrl, 
AuthenticationConfig authConfig)
@@ -94,7 +97,8 @@ public ThreadRuntime createContainer(InstanceConfig 
instanceConfig, String jarFi
             jarFile,
             pulsarClient,
             storageServiceUrl,
-            secretsProvider);
+            secretsProvider,
+            collectorRegistry);
     }
 
     @Override
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 2e00983ec5..c22970efb8 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -192,6 +192,7 @@ InstanceConfig 
createJavaInstanceConfig(FunctionDetails.Runtime runtime, boolean
         config.setFunctionVersion("1.0");
         config.setInstanceId(0);
         config.setMaxBufferedTuples(1024);
+        config.setClusterName("standalone");
 
         return config;
     }
@@ -233,16 +234,19 @@ private void verifyJavaInstance(InstanceConfig config, 
String depsDir, boolean s
         String classpath = javaInstanceJarFile;
         String extraDepsEnv;
         int portArg;
+        int metricsPortArg;
         int totalArgs;
         if (null != depsDir) {
             extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + 
depsDir;
             classpath = classpath + ":" + depsDir + "/*";
-            totalArgs = 29;
+            totalArgs = 33;
             portArg = 24;
+            metricsPortArg = 26;
         } else {
             extraDepsEnv = "";
             portArg = 23;
-            totalArgs = 28;
+            metricsPortArg = 25;
+            totalArgs = 32;
         }
         if (secretsAttached) {
             totalArgs += 4;
@@ -263,13 +267,15 @@ private void verifyJavaInstance(InstanceConfig config, 
String depsDir, boolean s
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + 
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+                + " --max_buffered_tuples 1024 --port " + args.get(portArg) + 
" --metrics_port " + args.get(metricsPortArg)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
                 + " --expected_healthcheck_interval -1";
         if (secretsAttached) {
             expectedArgs += " --secrets_provider 
org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
                     + " --secrets_provider_config 
'{\"Somevalue\":\"myvalue\"}'";
         }
+        expectedArgs += " --cluster_name standalone";
+
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
@@ -301,15 +307,18 @@ private void verifyPythonInstance(InstanceConfig config, 
String extraDepsDir, bo
         int portArg;
         String pythonPath;
         int configArg;
+        int metricsPortArg;
         if (null == extraDepsDir) {
-            totalArgs = 32;
+            totalArgs = 36;
             portArg = 29;
             configArg = 9;
             pythonPath = "";
+            metricsPortArg = 31;
         } else {
-            totalArgs = 33;
+            totalArgs = 37;
             portArg = 30;
             configArg = 10;
+            metricsPortArg = 32;
             pythonPath = "PYTHONPATH=${PYTHONPATH}:" + extraDepsDir + " ";
         }
         if (secretsAttached) {
@@ -331,12 +340,13 @@ private void verifyPythonInstance(InstanceConfig config, 
String extraDepsDir, bo
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + 
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+                + " --max_buffered_tuples 1024 --port " + args.get(portArg) + 
" --metrics_port " + args.get(metricsPortArg)
                 + " --expected_healthcheck_interval -1";
         if (secretsAttached) {
             expectedArgs += " --secrets_provider 
secretsprovider.ClearTextSecretsProvider"
                     + " --secrets_provider_config 
'{\"Somevalue\":\"myvalue\"}'";
         }
+        expectedArgs += " --cluster_name standalone";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 63875fd0f1..323943c13d 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -172,6 +172,7 @@ InstanceConfig 
createJavaInstanceConfig(FunctionDetails.Runtime runtime) {
         config.setFunctionVersion("1.0");
         config.setInstanceId(0);
         config.setMaxBufferedTuples(1024);
+        config.setClusterName("standalone");
 
         return config;
     }
@@ -250,15 +251,18 @@ private void verifyJavaInstance(InstanceConfig config, 
Path depsDir) throws Exce
         String classpath = javaInstanceJarFile;
         String extraDepsEnv;
         int portArg;
+        int metricsPortArg;
         if (null != depsDir) {
-            assertEquals(args.size(), 33);
+            assertEquals(args.size(), 37);
             extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + 
depsDir.toString();
             classpath = classpath + ":" + depsDir + "/*";
             portArg = 24;
+            metricsPortArg = 26;
         } else {
-            assertEquals(args.size(), 32);
+            assertEquals(args.size(), 36);
             extraDepsEnv = "";
             portArg = 23;
+            metricsPortArg = 25;
         }
 
         String expectedArgs = "java -cp " + classpath
@@ -273,11 +277,12 @@ private void verifyJavaInstance(InstanceConfig config, 
Path depsDir) throws Exce
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + 
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+                + " --max_buffered_tuples 1024 --port " + args.get(portArg) + 
" --metrics_port " + args.get(metricsPortArg)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
                 + " --expected_healthcheck_interval 30"
                 + " --secrets_provider 
org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
-                + " --secrets_provider_config '{\"Config\":\"Value\"}'";
+                + " --secrets_provider_config '{\"Config\":\"Value\"}'"
+                + " --cluster_name standalone";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
@@ -305,8 +310,9 @@ private void verifyPythonInstance(InstanceConfig config, 
String extraDepsDir) th
         ProcessRuntime container = factory.createContainer(config, 
userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
 
-        int totalArgs = 30;
+        int totalArgs = 34;
         int portArg = 23;
+        int metricsPortArg = 25;
         String pythonPath = "";
         int configArg = 9;
 
@@ -319,10 +325,11 @@ private void verifyPythonInstance(InstanceConfig config, 
String extraDepsDir) th
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + 
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+                + " --max_buffered_tuples 1024 --port " + args.get(portArg) + 
" --metrics_port " + args.get(metricsPortArg)
                 + " --expected_healthcheck_interval 30"
                 + " --secrets_provider 
secretsprovider.ClearTextSecretsProvider"
-                + " --secrets_provider_config '{\"Config\":\"Value\"}'";
+                + " --secrets_provider_config '{\"Config\":\"Value\"}'"
+                + " --cluster_name standalone";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 4f8d04a71d..46343a5b86 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -172,6 +172,7 @@ public void startFunction(FunctionRuntimeInfo 
functionRuntimeInfo) throws Except
         instanceConfig.setInstanceId(instanceId);
         instanceConfig.setMaxBufferedTuples(1024);
         
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
+        
instanceConfig.setClusterName(workerConfig.getPulsarFunctionsCluster());
 
         log.info("{}/{}/{}-{} start process with instance config {}", 
functionDetails.getTenant(), functionDetails.getNamespace(),
                 functionDetails.getName(), instanceId, instanceConfig);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 25b97ce9c4..8a8fafa1a5 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -36,6 +36,7 @@
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 
+import io.prometheus.client.CollectorRegistry;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -130,7 +131,8 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, 
WorkerService workerSer
                     workerConfig.getPulsarServiceUrl(),
                     workerConfig.getStateStorageServiceUrl(),
                     authConfig,
-                    new ClearTextSecretsProvider());
+                    new ClearTextSecretsProvider(),
+                     null);
         } else if (workerConfig.getProcessContainerFactory() != null) {
             this.runtimeFactory = new ProcessRuntimeFactory(
                     workerConfig.getPulsarServiceUrl(),
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index e497ff3352..d2f1504510 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -73,13 +73,13 @@ public static void generate(WorkerService workerService, 
String cluster, SimpleT
                                 int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
                                 String qualifiedNamespace = 
String.format("%s/%s", tenant, namespace);
 
-                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%scount", metricName),
+                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%s_count", metricName),
                                         instanceId, dataDigest.getCount());
-                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%smax", metricName),
+                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%s_max", metricName),
                                         instanceId, dataDigest.getMax());
-                                metric(out, cluster, qualifiedNamespace,name, 
String.format("%smin", metricName),
+                                metric(out, cluster, qualifiedNamespace,name, 
String.format("%s_min", metricName),
                                         instanceId, dataDigest.getMin());
-                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%ssum", metricName),
+                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%s_sum", metricName),
                                         instanceId, dataDigest.getSum());
 
                             }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index e35aa2b90b..c957de7e4b 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -87,10 +87,10 @@ public void testFunctionsStatsGenerate() {
         CompletableFuture<InstanceCommunication.MetricsData> 
metricsDataCompletableFuture = new CompletableFuture<>();
         InstanceCommunication.MetricsData metricsData = 
InstanceCommunication.MetricsData.newBuilder()
                 .putMetrics(
-                        "__function_total_processed__",
+                        "pulsar_function_processed_total",
                         
InstanceCommunication.MetricsData.DataDigest.newBuilder()
                                 
.setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build())
-                .putMetrics("__function_process_latency_ms__",
+                .putMetrics("pulsar_function_process_latency_ms",
                         
InstanceCommunication.MetricsData.DataDigest.newBuilder()
                                 
.setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build())
                 .build();
@@ -127,56 +127,56 @@ public void testFunctionsStatsGenerate() {
         Assert.assertEquals(metrics.size(), 8);
 
         System.out.println("metrics: " + metrics);
-        Metric m = metrics.get("__function_total_processed__count");
+        Metric m = metrics.get("pulsar_function_processed_total_count");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 100.0);
 
-        m = metrics.get("__function_total_processed__max");
+        m = metrics.get("pulsar_function_processed_total_max");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 200.0);
 
-        m = metrics.get("__function_total_processed__sum");
+        m = metrics.get("pulsar_function_processed_total_sum");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 300.0);
 
-        m = metrics.get("__function_total_processed__min");
+        m = metrics.get("pulsar_function_processed_total_min");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 0.0);
 
-        m = metrics.get("__function_process_latency_ms__count");
+        m = metrics.get("pulsar_function_process_latency_ms_count");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 10.0);
 
-        m = metrics.get("__function_process_latency_ms__max");
+        m = metrics.get("pulsar_function_process_latency_ms_max");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 20.0);
 
-        m = metrics.get("__function_process_latency_ms__sum");
+        m = metrics.get("pulsar_function_process_latency_ms_sum");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 30.0);
 
-        m = metrics.get("__function_process_latency_ms__min");
+        m = metrics.get("pulsar_function_process_latency_ms_min");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 066ca816a5..ffd1bd6bff 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -21,6 +21,7 @@
 import com.google.common.collect.Sets;
 import com.google.protobuf.InvalidProtocolBufferException;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import io.prometheus.client.CollectorRegistry;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
@@ -128,7 +129,7 @@ public void setup() {
     public void stop() {
         this.executor.shutdown();
     }
-    
+
     @Test
     public void testSchedule() throws Exception {
 
@@ -141,7 +142,7 @@ public void testSchedule() throws Exception {
         functionMetaDataList.add(function1);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -187,7 +188,7 @@ public void testNothingNewToSchedule() throws Exception {
         functionMetaDataList.add(function1);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -234,7 +235,7 @@ public void testAddingFunctions() throws Exception {
         functionMetaDataList.add(function2);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -294,7 +295,7 @@ public void testDeletingFunctions() throws Exception {
         functionMetaDataList.add(function1);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -359,7 +360,8 @@ public void testScalingUp() throws Exception {
         functionMetaDataList.add(function2);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider
+                (), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -470,7 +472,7 @@ public void testScalingDown() throws Exception {
         functionMetaDataList.add(function2);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -597,7 +599,7 @@ public void testHeartbeatFunction() throws Exception {
         functionMetaDataList.add(function2);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new 
HashMap<>();
@@ -651,7 +653,7 @@ public void testUpdate() throws Exception {
         functionMetaDataList.add(function2);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -784,7 +786,7 @@ public void testAssignmentWorkerDoesNotExist() throws 
InterruptedException, NoSu
         functionMetaDataList.add(function2);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to