This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 900e747  Expose metrics via http port in function instance (#2930)
900e747 is described below

commit 900e747908cfc916f9ff8ea5ff24041133c4ac99
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Thu Nov 8 12:16:07 2018 -0800

    Expose metrics via http port in function instance (#2930)
    
    * fix bugs in python metrics
    
    * instance expose metrics
    
    * remove commented out code
    
    * fix unit tests
    
    * remove commented out code
    
    * fixing test
    
    * fix python instance test
    
    * removing old code
    
    * fix bug
    
    * refactoring java metrics
    
    * refactoring python metrics
    
    * cleaning up code
    
    * removing unneccessary code
    
    * improving metrics format
    
    * fixing test
    
    * fix bugs and revising format
    
    * fix bug
    
    * fix for python3
    
    * change user defined metric to summary
    
    * renaming labels
    
    * change back python
---
 .../pulsar/functions/instance/ContextImpl.java     |  73 +++++++++----
 .../pulsar/functions/instance/FunctionStats.java   |  58 ++++++++---
 .../pulsar/functions/instance/InstanceConfig.java  |   1 +
 .../functions/instance/JavaInstanceRunnable.java   |  56 ++++++----
 .../instance/src/main/python/contextimpl.py        |  42 +++++---
 .../instance/src/main/python/function_stats.py     |  75 ++++++++++++++
 .../instance/src/main/python/python_instance.py    | 115 ++++++++-------------
 .../src/main/python/python_instance_main.py        |   8 +-
 .../pulsar/functions/instance/ContextImplTest.java |   3 +-
 .../instance/JavaInstanceRunnableTest.java         |   2 +-
 .../src/test/python/test_python_instance.py        |   2 +-
 .../pulsar/functions/runtime/JavaInstanceMain.java |  34 +++++-
 .../functions/runtime/KubernetesRuntime.java       |  79 ++------------
 .../pulsar/functions/runtime/LocalRunner.java      |   1 +
 .../pulsar/functions/runtime/ProcessRuntime.java   |   4 +-
 .../pulsar/functions/runtime/RuntimeUtils.java     |   9 +-
 .../pulsar/functions/runtime/ThreadRuntime.java    |  15 ++-
 .../functions/runtime/ThreadRuntimeFactory.java    |  12 ++-
 .../functions/runtime/KubernetesRuntimeTest.java   |  22 ++--
 .../functions/runtime/ProcessRuntimeTest.java      |  21 ++--
 .../pulsar/functions/worker/FunctionActioner.java  |   1 +
 .../functions/worker/FunctionRuntimeManager.java   |   4 +-
 .../functions/worker/FunctionsStatsGenerator.java  |   8 +-
 .../worker/FunctionStatsGeneratorTest.java         |  20 ++--
 .../functions/worker/SchedulerManagerTest.java     |  22 ++--
 25 files changed, 417 insertions(+), 270 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 406fe13..d13964d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,25 +18,12 @@
  */
 package org.apache.pulsar.functions.instance;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Summary;
 import lombok.Getter;
 import lombok.Setter;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -55,9 +42,24 @@ import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkState;
+
 /**
  * This class implements the Context interface exposed to the user.
  */
+
 class ContextImpl implements Context, SinkContext, SourceContext {
     private InstanceConfig config;
     private Logger logger;
@@ -92,7 +94,6 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
         }
     }
 
-    private ConcurrentMap<String, AccumulatedMetricDatum> 
currentAccumulatedMetrics;
     private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics;
 
     private Map<String, Producer<?>> publishProducers;
@@ -110,11 +111,21 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
     private StateContextImpl stateContext;
     private Map<String, Object> userConfigs;
 
+    Map<String, String[]> userMetricsLabels = new HashMap<>();
+    private final String[] metricsLabels;
+    private final Summary userMetricsSummary;
+
+    private final static String[] userMetricsLabelNames;
+    static {
+        // add label to indicate user metric
+        userMetricsLabelNames = Arrays.copyOf(FunctionStats.metricsLabelNames, 
FunctionStats.metricsLabelNames.length + 1);
+        userMetricsLabelNames[FunctionStats.metricsLabelNames.length] = 
"metric";
+    }
+
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient 
client, List<String> inputTopics,
-                       SecretsProvider secretsProvider) {
+                       SecretsProvider secretsProvider, CollectorRegistry 
collectorRegistry, String[] metricsLabels) {
         this.config = config;
         this.logger = logger;
-        this.currentAccumulatedMetrics = new ConcurrentHashMap<>();
         this.accumulatedMetrics = new ConcurrentHashMap<>();
         this.publishProducers = new HashMap<>();
         this.inputTopics = inputTopics;
@@ -138,6 +149,17 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
         } else {
             secretsMap = new HashMap<>();
         }
+
+        this.metricsLabels = metricsLabels;
+        this.userMetricsSummary = Summary.build()
+                .name("pulsar_function_user_metric")
+                .help("Pulsar Function user defined metric.")
+                .labelNames(userMetricsLabelNames)
+                .quantile(0.5, 0.01)
+                .quantile(0.9, 0.01)
+                .quantile(0.99, 0.01)
+                .quantile(0.999, 0.01)
+                .register(collectorRegistry);
     }
 
     public void setCurrentMessageContext(Record<?> record) {
@@ -320,8 +342,16 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
 
     @Override
     public void recordMetric(String metricName, double value) {
-        currentAccumulatedMetrics.putIfAbsent(metricName, new 
AccumulatedMetricDatum());
-        currentAccumulatedMetrics.get(metricName).update(value);
+        userMetricsLabels.computeIfAbsent(metricName,
+                s -> {
+                    String[] userMetricLabels = Arrays.copyOf(metricsLabels, 
metricsLabels.length + 1);
+                    userMetricLabels[userMetricLabels.length - 1] = metricName;
+                    return userMetricLabels;
+                });
+
+        
userMetricsSummary.labels(userMetricsLabels.get(metricName)).observe(value);
+        accumulatedMetrics.putIfAbsent(metricName, new 
AccumulatedMetricDatum());
+        accumulatedMetrics.get(metricName).update(value);
     }
 
     public MetricsData getAndResetMetrics() {
@@ -331,9 +361,8 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
     }
 
     public void resetMetrics() {
+        userMetricsSummary.clear();
         this.accumulatedMetrics.clear();
-        this.accumulatedMetrics.putAll(currentAccumulatedMetrics);
-        this.currentAccumulatedMetrics.clear();
     }
 
     public MetricsData getMetrics() {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
index 15b01f7..c456997 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.instance;
 import com.google.common.collect.EvictingQueue;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
 import io.prometheus.client.Summary;
 import lombok.Getter;
 import lombok.Setter;
@@ -35,7 +36,16 @@ import 
org.apache.pulsar.functions.proto.InstanceCommunication;
 @Setter
 public class FunctionStats {
 
-    private static final String[] metricsLabelNames = {"tenant", "namespace", 
"name", "instance_id"};
+    static final String[] metricsLabelNames = {"tenant", "namespace", 
"function", "instance_id", "cluster"};
+
+    /** Declare metric names **/
+    static final String PULSAR_FUNCTION_PROCESSED_TOTAL = 
"pulsar_function_processed_total";
+    static final String PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL = 
"pulsar_function_processed_successfully_total";
+    static final String PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL = 
"pulsar_function_system_exceptions_total";
+    static final String PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL = 
"pulsar_function_user_exceptions_total";
+    static final String PULSAR_FUNCTION_PROCESS_LATENCY_MS = 
"pulsar_function_process_latency_ms";
+    static final String PULSAR_FUNCTION_LAST_INVOCATION = 
"pulsar_function_last_invocation";
+    static final String PULSAR_FUNCTION_RECEIVED_TOTAL = 
"pulsar_function_received_total";
 
     /** Declare Prometheus stats **/
 
@@ -49,6 +59,10 @@ public class FunctionStats {
 
     final Summary statProcessLatency;
 
+    final Gauge statlastInvocation;
+
+    final Counter statTotalRecordsRecieved;
+
     CollectorRegistry functionCollectorRegistry;
 
     @Getter
@@ -56,47 +70,56 @@ public class FunctionStats {
     @Getter
     private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestSystemExceptions = EvictingQueue.create(10);
 
-    @Getter
-    @Setter
-    private long lastInvocationTime = 0;
-
-    public FunctionStats() {
+    public FunctionStats(CollectorRegistry collectorRegistry) {
         // Declare function local collector registry so that it will not clash 
with other function instances'
         // metrics collection especially in threaded mode
         functionCollectorRegistry = new CollectorRegistry();
 
         statTotalProcessed = Counter.build()
-                .name("__function_total_processed__")
+                .name(PULSAR_FUNCTION_PROCESSED_TOTAL)
                 .help("Total number of messages processed.")
                 .labelNames(metricsLabelNames)
-                .register(functionCollectorRegistry);
+                .register(collectorRegistry);
 
         statTotalProcessedSuccessfully = Counter.build()
-                .name("__function_total_successfully_processed__")
+                .name(PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL)
                 .help("Total number of messages processed successfully.")
                 .labelNames(metricsLabelNames)
-                .register(functionCollectorRegistry);
+                .register(collectorRegistry);
 
         statTotalSysExceptions = Counter.build()
-                .name("__function_total_system_exceptions__")
+                .name(PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL)
                 .help("Total number of system exceptions.")
                 .labelNames(metricsLabelNames)
-                .register(functionCollectorRegistry);
+                .register(collectorRegistry);
 
         statTotalUserExceptions = Counter.build()
-                .name("__function_total_user_exceptions__")
+                .name(PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL)
                 .help("Total number of user exceptions.")
                 .labelNames(metricsLabelNames)
-                .register(functionCollectorRegistry);
+                .register(collectorRegistry);
 
         statProcessLatency = Summary.build()
-                .name("__function_process_latency_ms__").help("Process latency 
in milliseconds.")
+                .name(PULSAR_FUNCTION_PROCESS_LATENCY_MS)
+                .help("Process latency in milliseconds.")
                 .quantile(0.5, 0.01)
                 .quantile(0.9, 0.01)
                 .quantile(0.99, 0.01)
                 .quantile(0.999, 0.01)
                 .labelNames(metricsLabelNames)
-                .register(functionCollectorRegistry);
+                .register(collectorRegistry);
+
+        statlastInvocation = Gauge.build()
+                .name(PULSAR_FUNCTION_LAST_INVOCATION)
+                .help("The timestamp of the last invocation of the function")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+
+        statTotalRecordsRecieved = Counter.build()
+                .name(PULSAR_FUNCTION_RECEIVED_TOTAL)
+                .help("Total number of messages received from source.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
     }
 
     public void addUserException(Exception ex) {
@@ -120,8 +143,9 @@ public class FunctionStats {
         statTotalSysExceptions.clear();
         statTotalUserExceptions.clear();
         statProcessLatency.clear();
+        statlastInvocation.clear();
+        statTotalRecordsRecieved.clear();
         latestUserExceptions.clear();
         latestSystemExceptions.clear();
-        lastInvocationTime = 0;
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 040af91..51ec0d4 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -41,6 +41,7 @@ public class InstanceConfig {
     private FunctionDetails functionDetails;
     private int maxBufferedTuples;
     private int port;
+    private String clusterName;
 
     /**
      * Get the string representation of {@link #getInstanceId()}.
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 9e5ae51..ab900ac 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.functions.instance;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
+import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Summary;
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -45,6 +46,8 @@ import org.apache.logging.log4j.core.config.LoggerConfig;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.functions.ConsumerConfig;
+import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
@@ -58,8 +61,6 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig;
 import org.apache.pulsar.functions.sink.PulsarSinkDisable;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.common.functions.ConsumerConfig;
-import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.StateUtils;
@@ -116,6 +117,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     private Sink sink;
 
     private final SecretsProvider secretsProvider;
+
+    private CollectorRegistry collectorRegistry;
     private final String[] metricsLabels;
 
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
@@ -123,19 +126,25 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                                 String jarFile,
                                 PulsarClient pulsarClient,
                                 String stateStorageServiceUrl,
-                                SecretsProvider secretsProvider) {
+                                SecretsProvider secretsProvider,
+                                CollectorRegistry collectorRegistry) {
         this.instanceConfig = instanceConfig;
         this.fnCache = fnCache;
         this.jarFile = jarFile;
         this.client = (PulsarClientImpl) pulsarClient;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
-        this.stats = new FunctionStats();
+        this.stats = new FunctionStats(collectorRegistry);
         this.secretsProvider = secretsProvider;
+        this.collectorRegistry = collectorRegistry;
         this.metricsLabels = new String[]{
                 instanceConfig.getFunctionDetails().getTenant(),
-                instanceConfig.getFunctionDetails().getNamespace(),
-                instanceConfig.getFunctionDetails().getName(),
-                String.valueOf(instanceConfig.getInstanceId())
+                String.format("%s/%s", 
instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace()),
+                String.format("%s/%s/%s", 
instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace(),
+                        instanceConfig.getFunctionDetails().getName()),
+                String.valueOf(instanceConfig.getInstanceId()),
+                instanceConfig.getClusterName()
         };
     }
 
@@ -181,7 +190,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         }
         Logger instanceLog = LoggerFactory.getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
-        return new ContextImpl(instanceConfig, instanceLog, client, 
inputTopics, secretsProvider);
+        return new ContextImpl(instanceConfig, instanceLog, client, 
inputTopics, secretsProvider, collectorRegistry, metricsLabels);
     }
 
     /**
@@ -201,6 +210,9 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
             while (true) {
                 currentRecord = readInput();
 
+                // increment number of records received from source
+                stats.statTotalRecordsRecieved.labels(metricsLabels).inc();
+
                 if 
(instanceConfig.getFunctionDetails().getProcessingGuarantees() == 
org.apache.pulsar.functions
                         .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
                     if (instanceConfig.getFunctionDetails().getAutoAck()) {
@@ -212,7 +224,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
                 JavaExecutionResult result;
 
                 // set last invocation time
-                stats.setLastInvocationTime(System.currentTimeMillis());
+                
stats.statlastInvocation.labels(metricsLabels).set(System.currentTimeMillis());
 
                 // start time for process latency stat
                 Summary.Timer requestTimer = 
stats.statProcessLatency.labels(metricsLabels).startTimer();
@@ -331,7 +343,6 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
             stats.addUserException(result.getUserException() );
             srcRecord.fail();
         } else {
-            stats.statTotalProcessedSuccessfully.labels(metricsLabels).inc();
             if (result.getResult() != null) {
                 sendOutputMessage(srcRecord, result.getResult());
             } else {
@@ -340,6 +351,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
                     srcRecord.ack();
                 }
             }
+            // increment total successfully processed
+            stats.statTotalProcessedSuccessfully.labels(metricsLabels).inc();
         }
     }
 
@@ -439,22 +452,24 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
 
     private Builder createMetricsDataBuilder() {
         InstanceCommunication.MetricsData.Builder bldr = 
InstanceCommunication.MetricsData.newBuilder();
-        addSystemMetrics("__total_processed__", 
stats.statTotalProcessed.labels(metricsLabels).get(), bldr);
-        addSystemMetrics("__total_successfully_processed__", 
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr);
-        addSystemMetrics("__total_system_exceptions__",  
stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr);
-        addSystemMetrics("__total_user_exceptions__", 
stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr);
-        addSystemMetrics("__avg_latency_ms__",
+        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_TOTAL, 
stats.statTotalProcessed.labels(metricsLabels).get(), bldr);
+        
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL, 
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr);
+        
addSystemMetrics(FunctionStats.PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL,  
stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr);
+        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL, 
stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr);
+        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_RECEIVED_TOTAL, 
stats.statTotalRecordsRecieved.labels(metricsLabels).get(), bldr);
+        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESS_LATENCY_MS,
                 stats.statProcessLatency.labels(metricsLabels).get().count <= 
0.0
                         ? 0 : 
stats.statProcessLatency.labels(metricsLabels).get().sum / 
stats.statProcessLatency.labels(metricsLabels).get().count,
                 bldr);
+        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_LAST_INVOCATION, 
stats.statlastInvocation.labels(metricsLabels).get(), bldr);
         return bldr;
     }
 
     public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
         InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = 
InstanceCommunication.FunctionStatus.newBuilder();
-        
functionStatusBuilder.setNumProcessed((long)stats.statTotalProcessed.labels(metricsLabels).get());
-        
functionStatusBuilder.setNumSuccessfullyProcessed((long)stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
-        
functionStatusBuilder.setNumUserExceptions((long)stats.statTotalUserExceptions.labels(metricsLabels).get());
+        functionStatusBuilder.setNumProcessed((long) 
stats.statTotalProcessed.labels(metricsLabels).get());
+        functionStatusBuilder.setNumSuccessfullyProcessed((long) 
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
+        functionStatusBuilder.setNumUserExceptions((long) 
stats.statTotalUserExceptions.labels(metricsLabels).get());
         stats.getLatestUserExceptions().forEach(ex -> {
             functionStatusBuilder.addLatestUserExceptions(ex);
         });
@@ -464,8 +479,9 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         });
         functionStatusBuilder.setAverageLatency(
                 stats.statProcessLatency.labels(metricsLabels).get().count == 
0.0
-                        ? 0 : 
stats.statProcessLatency.labels(metricsLabels).get().sum / 
stats.statProcessLatency.labels(metricsLabels).get().count);
-        
functionStatusBuilder.setLastInvocationTime(stats.getLastInvocationTime());
+                        ? 0 : 
stats.statProcessLatency.labels(metricsLabels).get().sum / 
stats.statProcessLatency
+                        .labels(metricsLabels).get().count);
+        functionStatusBuilder.setLastInvocationTime((long) 
stats.statlastInvocation.labels(metricsLabels).get());
         return functionStatusBuilder;
     }
 
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py 
b/pulsar-functions/instance/src/main/python/contextimpl.py
index 83a63aa..9a37d59 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -31,6 +31,9 @@ import pulsar
 import util
 import InstanceCommunication_pb2
 
+from prometheus_client import Summary
+from function_stats import Stats
+
 # For keeping track of accumulated metrics
 class AccumulatedMetricDatum(object):
   def __init__(self):
@@ -39,7 +42,7 @@ class AccumulatedMetricDatum(object):
     self.max = float('-inf')
     self.min = float('inf')
 
-  def record(self, value):
+  def update(self, value):
     self.count += 1
     self.sum += value
     if value > self.max:
@@ -48,14 +51,17 @@ class AccumulatedMetricDatum(object):
       self.min = value
 
 class ContextImpl(pulsar.Context):
-  def __init__(self, instance_config, logger, pulsar_client, user_code, 
consumers, secrets_provider):
+
+  # add label to indicate user metric
+  user_metrics_label_names = Stats.metrics_label_names + ["metric"]
+
+  def __init__(self, instance_config, logger, pulsar_client, user_code, 
consumers, secrets_provider, metrics_labels):
     self.instance_config = instance_config
     self.log = logger
     self.pulsar_client = pulsar_client
     self.user_code_dir = os.path.dirname(user_code)
     self.consumers = consumers
     self.secrets_provider = secrets_provider
-    self.current_accumulated_metrics = {}
     self.accumulated_metrics = {}
     self.publish_producers = {}
     self.publish_serializers = {}
@@ -69,6 +75,12 @@ class ContextImpl(pulsar.Context):
       if instance_config.function_details.secretsMap \
       else {}
 
+    self.metrics_labels = metrics_labels
+    self.user_metrics_labels = dict()
+    self.user_metrics_summary = Summary("pulsar_function_user_metric",
+                                    'Pulsar Function user defined metric',
+                                        ContextImpl.user_metrics_label_names)
+
   # Called on a per message basis to set the context for the current message
   def set_current_message_context(self, msgid, topic):
     self.current_message_id = msgid
@@ -117,9 +129,12 @@ class ContextImpl(pulsar.Context):
     return self.secrets_provider.provide_secret(secret_key, 
self.secrets_map[secret_key])
 
   def record_metric(self, metric_name, metric_value):
-    if not metric_name in self.current_accumulated_metrics:
-      self.current_accumulated_metrics[metric_name] = AccumulatedMetricDatum()
-    self.current_accumulated_metrics[metric_name].update(metric_value)
+    if metric_name not in self.user_metrics_labels:
+      self.user_metrics_labels[metric_name] = self.metrics_labels + 
[metric_name]
+    
self.user_metrics_summary.labels(*self.user_metrics_labels[metric_name]).observe(metric_value)
+    if not metric_name in self.accumulated_metrics:
+      self.accumulated_metrics[metric_name] = AccumulatedMetricDatum()
+    self.accumulated_metrics[metric_name].update(metric_value)
 
   def get_output_topic(self):
     return self.instance_config.function_details.output
@@ -164,17 +179,16 @@ class ContextImpl(pulsar.Context):
 
   def reset_metrics(self):
     # TODO: Make it thread safe
+    for labels in self.user_metrics_labels.values():
+      self.user_metrics_summary.labels(*labels)._sum.set(0.0)
+      self.user_metrics_summary.labels(*labels)._count.set(0.0)
     self.accumulated_metrics.clear()
-    self.accumulated_metrics.update(self.current_accumulated_metrics)
-    self.current_accumulated_metrics.clear()
 
   def get_metrics(self):
     metrics = InstanceCommunication_pb2.MetricsData()
     for metric_name, accumulated_metric in self.accumulated_metrics.items():
-      m = InstanceCommunication_pb2.MetricsData.DataDigest()
-      m.count = accumulated_metric.count
-      m.sum = accumulated_metric.sum
-      m.max = accumulated_metric.max
-      m.min = accumulated_metric.min
-      metrics.metrics[metric_name] = m
+      metrics.metrics[metric_name].count = accumulated_metric.count
+      metrics.metrics[metric_name].sum = accumulated_metric.sum
+      metrics.metrics[metric_name].max = accumulated_metric.max
+      metrics.metrics[metric_name].min = accumulated_metric.min
     return metrics
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py 
b/pulsar-functions/instance/src/main/python/function_stats.py
new file mode 100644
index 0000000..13b3f84
--- /dev/null
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import traceback
+import time
+
+from prometheus_client import Counter, Summary, Gauge
+
+# We keep track of the following metrics
+class Stats(object):
+  metrics_label_names = ['tenant', 'namespace', 'function', 'instance_id', 
'cluster']
+
+  TOTAL_PROCESSED = 'pulsar_function_processed_total'
+  TOTAL_SUCCESSFULLY_PROCESSED = 'pulsar_function_processed_successfully_total'
+  TOTAL_SYSTEM_EXCEPTIONS = 'pulsar_function_system_exceptions_total'
+  TOTAL_USER_EXCEPTIONS = 'pulsar_function_user_exceptions_total'
+  PROCESS_LATENCY_MS = 'pulsar_function_process_latency_ms'
+  LAST_INVOCATION = 'pulsar_function_last_invocation'
+  TOTAL_RECEIVED = 'pulsar_function_received_total'
+
+  # Declare Prometheus
+  stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages 
processed.', metrics_label_names)
+  stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED,
+                                              'Total number of messages 
processed successfully.', metrics_label_names)
+  stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number 
of system exceptions.',
+                                      metrics_label_names)
+  stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of 
user exceptions.',
+                                       metrics_label_names)
+
+  stat_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in 
milliseconds.', metrics_label_names)
+
+  stat_last_invocation = Gauge(LAST_INVOCATION, 'The timestamp of the last 
invocation of the function.', metrics_label_names)
+
+  stat_total_received = Counter(TOTAL_RECEIVED, 'Total number of messages 
received from source.', metrics_label_names)
+
+  latest_user_exception = []
+  latest_sys_exception = []
+
+  def add_user_exception(self):
+    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() 
* 1000)))
+    if len(self.latest_sys_exception) > 10:
+      self.latest_sys_exception.pop(0)
+
+  def add_sys_exception(self):
+    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() 
* 1000)))
+    if len(self.latest_sys_exception) > 10:
+      self.latest_sys_exception.pop(0)
+
+  def reset(self, metrics_labels):
+    self.latest_user_exception = []
+    self.latest_sys_exception = []
+    self.stat_total_processed.labels(*metrics_labels)._value.set(0.0)
+    
self.stat_total_processed_successfully.labels(*metrics_labels)._value.set(0.0)
+    self.stat_total_user_exceptions.labels(*metrics_labels)._value.set(0.0)
+    self.stat_total_sys_exceptions.labels(*metrics_labels)._value.set(0.0)
+    self.stat_process_latency_ms.labels(*metrics_labels)._sum.set(0.0)
+    self.stat_process_latency_ms.labels(*metrics_labels)._count.set(0.0)
+    self.stat_last_invocation.labels(*metrics_labels).set(0.0)
+    self.stat_total_received.labels(*metrics_labels)._value.set(0.0)
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 66d222c..1d56a39 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -31,14 +31,8 @@ try:
 except:
   import queue
 import threading
-from functools import partial
-from collections import namedtuple
-from threading import Timer
-from prometheus_client import Counter, Summary
-import traceback
 import sys
 import re
-
 import pulsar
 import contextimpl
 import Function_pb2
@@ -46,6 +40,11 @@ import log
 import util
 import InstanceCommunication_pb2
 
+from functools import partial
+from collections import namedtuple
+from threading import Timer
+from function_stats import Stats
+
 Log = log.Log
 # Equivalent of the InstanceConfig in Java
 InstanceConfig = namedtuple('InstanceConfig', 'instance_id function_id 
function_version function_details max_buffered_tuples')
@@ -68,55 +67,9 @@ def base64ify(bytes_or_str):
     else:
         return output_bytes
 
-# We keep track of the following metrics
-class Stats(object):
-  metrics_label_names = ['tenant', 'namespace', 'name', 'instance_id']
-
-  TOTAL_PROCESSED = '__function_total_processed__'
-  TOTAL_SUCCESSFULLY_PROCESSED = '__function_total_successfully_processed__'
-  TOTAL_SYSTEM_EXCEPTIONS = '__function_total_system_exceptions__'
-  TOTAL_USER_EXCEPTIONS = '__function_total_user_exceptions__'
-  PROCESS_LATENCY_MS = '__function_process_latency_ms__'
-
-  # Declare Prometheus
-  stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages 
processed.', metrics_label_names)
-  stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED,
-                                              'Total number of messages 
processed successfully.', metrics_label_names)
-  stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number 
of system exceptions.',
-                                      metrics_label_names)
-  stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of 
user exceptions.',
-                                       metrics_label_names)
-
-  stats_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in 
milliseconds.', metrics_label_names)
-
-  latest_user_exception = []
-  latest_sys_exception = []
-
-  last_invocation_time = 0.0
-
-  def add_user_exception(self):
-    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() 
* 1000)))
-    if len(self.latest_sys_exception) > 10:
-      self.latest_sys_exception.pop(0)
-
-  def add_sys_exception(self):
-    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() 
* 1000)))
-    if len(self.latest_sys_exception) > 10:
-      self.latest_sys_exception.pop(0)
-
-  def reset(self, metrics_labels):
-    self.latest_user_exception = []
-    self.latest_sys_exception = []
-    self.stat_total_processed.labels(*metrics_labels)._value.set(0.0)
-    
self.stat_total_processed_successfully.labels(*metrics_labels)._value.set(0.0)
-    self.stat_total_user_exceptions.labels(*metrics_labels)._value.set(0.0)
-    self.stat_total_sys_exceptions.labels(*metrics_labels)._value.set(0.0)
-    self.stats_process_latency_ms.labels(*metrics_labels)._sum.set(0)
-    self.stats_process_latency_ms.labels(*metrics_labels)._count.set(0);
-    self.last_invocation_time = 0.0
-
 class PythonInstance(object):
-  def __init__(self, instance_id, function_id, function_version, 
function_details, max_buffered_tuples, expected_healthcheck_interval, 
user_code, pulsar_client, secrets_provider):
+  def __init__(self, instance_id, function_id, function_version, 
function_details, max_buffered_tuples,
+               expected_healthcheck_interval, user_code, pulsar_client, 
secrets_provider, cluster_name):
     self.instance_config = InstanceConfig(instance_id, function_id, 
function_version, function_details, max_buffered_tuples)
     self.user_code = user_code
     self.queue = queue.Queue(max_buffered_tuples)
@@ -140,7 +93,10 @@ class PythonInstance(object):
     self.timeout_ms = function_details.source.timeoutMs if 
function_details.source.timeoutMs > 0 else None
     self.expected_healthcheck_interval = expected_healthcheck_interval
     self.secrets_provider = secrets_provider
-    self.metrics_labels = [function_details.tenant, 
function_details.namespace, function_details.name, instance_id]
+    self.metrics_labels = [function_details.tenant,
+                           "%s/%s" % (function_details.tenant, 
function_details.namespace),
+                           "%s/%s/%s" % (function_details.tenant, 
function_details.namespace, function_details.name),
+                           instance_id, cluster_name]
 
   def health_check(self):
     self.last_health_check_ts = time.time()
@@ -210,7 +166,9 @@ class PythonInstance(object):
     except:
       self.function_purefunction = function_kclass
 
-    self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, 
self.pulsar_client, self.user_code, self.consumers, self.secrets_provider)
+    self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, 
self.pulsar_client,
+                                               self.user_code, self.consumers,
+                                               self.secrets_provider, 
self.metrics_labels)
     # Now launch a thread that does execution
     self.execution_thread = threading.Thread(target=self.actual_execution)
     self.execution_thread.start()
@@ -242,13 +200,13 @@ class PythonInstance(object):
         try:
           # get user function start time for statistic calculation
           start_time = time.time()
-          self.stats.last_invocation_time = start_time * 1000.0
+          
Stats.stat_last_invocation.labels(*self.metrics_labels).set(start_time * 1000.0)
           if self.function_class is not None:
             output_object = self.function_class.process(input_object, 
self.contextimpl)
           else:
             output_object = self.function_purefunction.process(input_object)
           successfully_executed = True
-          
Stats.stats_process_latency_ms.labels(*self.metrics_labels).observe((time.time()
 - start_time) * 1000.0)
+          
Stats.stat_process_latency_ms.labels(*self.metrics_labels).observe((time.time() 
- start_time) * 1000.0)
           Stats.stat_total_processed.labels(*self.metrics_labels).inc()
         except Exception as e:
           Log.exception("Exception while executing user method")
@@ -310,6 +268,8 @@ class PythonInstance(object):
         max_pending_messages=100000)
 
   def message_listener(self, serde, consumer, message):
+    # increment number of received records from source
+    Stats.stat_total_received.labels(*self.metrics_labels).inc()
     item = InternalMessage(message, consumer.topic(), serde, consumer)
     self.queue.put(item, True)
     if self.atmost_once and self.auto_ack:
@@ -329,14 +289,16 @@ class PythonInstance(object):
     # First get any user metrics
     metrics = self.contextimpl.get_metrics()
     # Now add system metrics as well
-    self.add_system_metrics("__total_processed__", 
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics)
-    self.add_system_metrics("__total_successfully_processed__", 
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(),
 metrics)
-    self.add_system_metrics("__total_system_exceptions__", 
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get(), 
metrics)
-    self.add_system_metrics("__total_user_exceptions__", 
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get(), 
metrics)
-    self.add_system_metrics("__avg_latency_ms__",
-                            0.0 if 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0
-                            else 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() / 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get(),
+    self.add_system_metrics(Stats.TOTAL_PROCESSED, 
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics)
+    self.add_system_metrics(Stats.TOTAL_SUCCESSFULLY_PROCESSED, 
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(),
 metrics)
+    self.add_system_metrics(Stats.TOTAL_SYSTEM_EXCEPTIONS, 
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get(), 
metrics)
+    self.add_system_metrics(Stats.TOTAL_USER_EXCEPTIONS, 
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get(), 
metrics)
+    self.add_system_metrics(Stats.PROCESS_LATENCY_MS,
+                            0.0 if 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0
+                            else 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get() / 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get(),
                             metrics)
+    self.add_system_metrics(Stats.TOTAL_RECEIVED, 
Stats.stat_total_received.labels(*self.metrics_labels)._value.get(), metrics)
+    self.add_system_metrics(Stats.LAST_INVOCATION, 
Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get(), metrics)
     return metrics
 
   def add_system_metrics(self, metric_name, value, metrics):
@@ -348,23 +310,32 @@ class PythonInstance(object):
   def get_function_status(self):
     status = InstanceCommunication_pb2.FunctionStatus()
     status.running = True
-    status.numProcessed = 
long(Stats.stat_total_processed.labels(*self.metrics_labels)._value.get())
-    status.numSuccessfullyProcessed = 
long(Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get())
-    status.numUserExceptions = 
long(Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get())
+
+    total_processed = 
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get()
+    stat_total_processed_successfully = 
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get()
+    stat_total_user_exceptions = 
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get()
+    stat_total_sys_exceptions = 
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get()
+    stat_process_latency_ms_count = 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
+    stat_process_latency_ms_sum = 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
+    stat_last_invocation = 
Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get()
+
+    status.numProcessed =  int(total_processed) if sys.version_info.major >= 3 
else long(total_processed)
+    status.numSuccessfullyProcessed = int(stat_total_processed_successfully) 
if sys.version_info.major >= 3 else long(stat_total_processed_successfully)
+    status.numUserExceptions = int(stat_total_user_exceptions) if 
sys.version_info.major >= 3 else long(stat_total_user_exceptions)
     status.instanceId = self.instance_config.instance_id
     for ex, tm in self.stats.latest_user_exception:
       to_add = status.latestUserExceptions.add()
       to_add.exceptionString = ex
       to_add.msSinceEpoch = tm
-    status.numSystemExceptions = 
long(Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get())
+    status.numSystemExceptions = int(stat_total_sys_exceptions) if 
sys.version_info.major >= 3 else long(stat_total_sys_exceptions)
     for ex, tm in self.stats.latest_sys_exception:
       to_add = status.latestSystemExceptions.add()
       to_add.exceptionString = ex
       to_add.msSinceEpoch = tm
     status.averageLatency = 0.0 \
-      if 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0 
\
-      else 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() / 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get()
-    status.lastInvocationTime = long(self.stats.last_invocation_time)
+      if stat_process_latency_ms_count <= 0.0 \
+      else stat_process_latency_ms_sum / stat_process_latency_ms_count
+    status.lastInvocationTime = int(stat_last_invocation) if 
sys.version_info.major >= 3 else long(stat_last_invocation)
     return status
 
   def join(self):
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index d256269..2a02380 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -39,6 +39,8 @@ import log
 import server
 import python_instance
 import util
+import prometheus_client
+
 from google.protobuf import json_format
 
 to_run = True
@@ -69,6 +71,7 @@ def main():
   parser.add_argument('--hostname_verification_enabled', required=False, 
help='Enable hostname verification')
   parser.add_argument('--tls_trust_cert_path', required=False, help='Tls trust 
cert file path')
   parser.add_argument('--port', required=True, help='Instance Port', type=int)
+  parser.add_argument('--metrics_port', required=True, help="Port metrics will 
be exposed on", type=int)
   parser.add_argument('--max_buffered_tuples', required=True, help='Maximum 
number of Buffered tuples')
   parser.add_argument('--logging_directory', required=True, help='Logging 
Directory')
   parser.add_argument('--logging_file', required=True, help='Log file name')
@@ -79,6 +82,7 @@ def main():
   parser.add_argument('--install_usercode_dependencies', required=False, 
help='For packaged python like wheel files, do we need to install all 
dependencies', type=bool)
   parser.add_argument('--dependency_repository', required=False, help='For 
packaged python like wheel files, which repository to pull the dependencies 
from')
   parser.add_argument('--extra_dependency_repository', required=False, 
help='For packaged python like wheel files, any extra repository to pull the 
dependencies from')
+  parser.add_argument('--cluster_name', required=True, help='The name of the 
cluster this instance is running on')
 
   args = parser.parse_args()
   function_details = Function_pb2.FunctionDetails()
@@ -166,10 +170,12 @@ def main():
                                               str(args.function_version), 
function_details,
                                               int(args.max_buffered_tuples),
                                               
int(args.expected_healthcheck_interval),
-                                              str(args.py), pulsar_client, 
secrets_provider)
+                                              str(args.py), pulsar_client, 
secrets_provider, args.cluster_name)
   pyinstance.run()
   server_instance = server.serve(args.port, pyinstance)
 
+  prometheus_client.start_http_server(args.metrics_port)
+
   global to_run
   while to_run:
     time.sleep(1)
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index e3e32fd..0ac3502 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import io.prometheus.client.CollectorRegistry;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
@@ -75,7 +76,7 @@ public class ContextImplTest {
             logger,
             client,
             new ArrayList<>(),
-            new EnvironmentBasedSecretsProvider()
+            new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), 
new String[0]
         );
     }
 
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 80b3b1d..b927c49 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -56,7 +56,7 @@ public class JavaInstanceRunnableTest {
     private JavaInstanceRunnable createRunnable(boolean addCustom, String 
outputSerde) throws Exception {
         InstanceConfig config = createInstanceConfig(addCustom, outputSerde);
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                config, null, null, null, null, null);
+                config, null, null, null, null, null, null);
         return javaInstanceRunnable;
     }
 
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py 
b/pulsar-functions/instance/src/test/python/test_python_instance.py
index 748e5d8..b865a9d 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_python_instance.py
@@ -48,7 +48,7 @@ class TestContextImpl(unittest.TestCase):
     pulsar_client.create_producer = Mock(return_value=producer)
     user_code=__file__
     consumers = None
-    context_impl = ContextImpl(instance_config, logger, pulsar_client, 
user_code, consumers, None)
+    context_impl = ContextImpl(instance_config, logger, pulsar_client, 
user_code, consumers, None, None)
 
     context_impl.publish("test_topic_name", "test_message")
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 98986dd..d5fb80e 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -29,8 +29,17 @@ import com.google.protobuf.util.JsonFormat;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.HTTPServer;
+import io.prometheus.client.hotspot.BufferPoolsExports;
+import io.prometheus.client.hotspot.ClassLoadingExports;
 import io.prometheus.client.hotspot.DefaultExports;
+import io.prometheus.client.hotspot.GarbageCollectorExports;
+import io.prometheus.client.hotspot.MemoryPoolsExports;
+import io.prometheus.client.hotspot.StandardExports;
+import io.prometheus.client.hotspot.ThreadExports;
+import io.prometheus.client.hotspot.VersionInfoExports;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -43,12 +52,14 @@ import 
org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.utils.Reflections;
 
 import java.lang.reflect.Type;
+import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 /**
  * A function container implemented using java thread.
@@ -99,6 +110,9 @@ public class JavaInstanceMain implements AutoCloseable {
     @Parameter(names = "--port", description = "Port to listen on\n", required 
= true)
     protected int port;
 
+    @Parameter(names = "--metrics_port", description = "Port metrics will be 
exposed on\n", required = true)
+    protected int metrics_port;
+
     @Parameter(names = "--max_buffered_tuples", description = "Maximum number 
of tuples to buffer\n", required = true)
     protected int maxBufferedTuples;
 
@@ -111,11 +125,15 @@ public class JavaInstanceMain implements AutoCloseable {
     @Parameter(names = "--secrets_provider_config", description = "The config 
that needs to be passed to secrets provider", required = false)
     protected String secretsProviderConfig;
 
+    @Parameter(names = "--cluster_name", description = "The name of the 
cluster this instance is running on", required = true)
+    protected String clusterName;
+
     private Server server;
     private RuntimeSpawner runtimeSpawner;
     private ThreadRuntimeFactory containerFactory;
     private Long lastHealthCheckTs = null;
     private ScheduledExecutorService timer;
+    private HTTPServer metricsServer;
 
     public JavaInstanceMain() { }
 
@@ -126,6 +144,7 @@ public class JavaInstanceMain implements AutoCloseable {
         instanceConfig.setFunctionVersion(functionVersion);
         instanceConfig.setInstanceId(instanceId);
         instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
+        instanceConfig.setClusterName(clusterName);
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
         if (functionDetailsJsonString.charAt(0) == '\'') {
             functionDetailsJsonString = functionDetailsJsonString.substring(1);
@@ -162,6 +181,9 @@ public class JavaInstanceMain implements AutoCloseable {
         }
         secretsProvider.init(secretsProviderConfigMap);
 
+        // Collector Registry for prometheus metrics
+        CollectorRegistry collectorRegistry = new CollectorRegistry();
+
         containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", 
pulsarServiceUrl,
                 stateStorageServiceUrl,
                 
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin)
@@ -169,7 +191,7 @@ public class JavaInstanceMain implements AutoCloseable {
                         
.tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
                         
.tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
                         .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
-                secretsProvider);
+                secretsProvider, collectorRegistry);
         runtimeSpawner = new RuntimeSpawner(
                 instanceConfig,
                 jarFile,
@@ -195,12 +217,13 @@ public class JavaInstanceMain implements AutoCloseable {
             }
         });
 
-        // registering jvm metrics to prometheus
-        DefaultExports.initialize();
-
         log.info("Starting runtimeSpawner");
         runtimeSpawner.start();
 
+        // starting metrics server
+        log.info("Starting metrics server on port {}", metrics_port);
+        metricsServer = new HTTPServer(new InetSocketAddress(metrics_port), 
collectorRegistry, true);
+
         if (expectedHealthCheckInterval > 0) {
             timer = Executors.newSingleThreadScheduledExecutor();
             timer.scheduleAtFixedRate(new TimerTask() {
@@ -253,6 +276,9 @@ public class JavaInstanceMain implements AutoCloseable {
             if (containerFactory != null) {
                 containerFactory.close();
             }
+            if (metricsServer != null) {
+                metricsServer.stop();
+            }
         } catch (Exception ex) {
             System.err.println(ex);
         }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 6e716f2..4dc97ac 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -88,9 +88,7 @@ class KubernetesRuntime implements Runtime {
     private static final String ENV_SHARD_ID = "SHARD_ID";
     private static final int maxJobNameSize = 55;
     private static final Integer GRPC_PORT = 9093;
-    private static final Integer PROMETHEUS_PORT = 9094;
-    private static final Double prometheusMetricsServerCpu = 0.1;
-    private static final Long prometheusMetricsServerRam = 125000000l;
+    private static final Integer METRICS_PORT = 9094;
     public static final Pattern VALID_POD_NAME_REGEX =
             
Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
                     Pattern.CASE_INSENSITIVE);
@@ -110,7 +108,6 @@ class KubernetesRuntime implements Runtime {
     // The thread that invokes the function
     @Getter
     private List<String> processArgs;
-    private List<String> prometheusMetricsServerArgs;
     @Getter
     private ManagedChannel[] channel;
     private InstanceControlGrpc.InstanceControlFutureStub[] stub;
@@ -190,8 +187,8 @@ class KubernetesRuntime implements Runtime {
             secretsProviderConfig,
             installUserCodeDependencies,
             pythonDependencyRepository,
-            pythonExtraDependencyRepository);
-        this.prometheusMetricsServerArgs = 
composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, 
expectedMetricsInterval);
+            pythonExtraDependencyRepository,
+                METRICS_PORT);
         running = false;
         doChecks(instanceConfig.getFunctionDetails());
     }
@@ -453,14 +450,6 @@ class KubernetesRuntime implements Runtime {
         );
     }
 
-    protected List<String> getPrometheusMetricsServerCommand() {
-        return Arrays.asList(
-                "sh",
-                "-c",
-                String.join(" ", prometheusMetricsServerArgs)
-        );
-    }
-
     private List<String> getDownloadCommand(String bkPath, String 
userCodeFilePath) {
         return Arrays.asList(
                 pulsarRootDir + "/bin/pulsar-admin",
@@ -525,15 +514,16 @@ class KubernetesRuntime implements Runtime {
     private Map<String, String> getPrometheusAnnotations() {
         final Map<String, String> annotations = new HashMap<>();
         annotations.put("prometheus.io/scrape", "true");
-        annotations.put("prometheus.io/port", String.valueOf(PROMETHEUS_PORT));
+        annotations.put("prometheus.io/port", String.valueOf(METRICS_PORT));
         return annotations;
     }
 
     private Map<String, String> getLabels(Function.FunctionDetails 
functionDetails) {
         final Map<String, String> labels = new HashMap<>();
-        labels.put("app", createJobName(functionDetails));
-        labels.put("namespace", functionDetails.getNamespace());
+        labels.put("namespace", 
String.format("%s/%s",functionDetails.getTenant(), 
functionDetails.getNamespace()));
         labels.put("tenant", functionDetails.getTenant());
+        labels.put("function", String.format("%s/%s/%s", 
functionDetails.getTenant(),
+                functionDetails.getNamespace(), functionDetails.getName()));
         if (customLabels != null && !customLabels.isEmpty()) {
             labels.putAll(customLabels);
         }
@@ -552,7 +542,6 @@ class KubernetesRuntime implements Runtime {
 
         List<V1Container> containers = new LinkedList<>();
         containers.add(getFunctionContainer(instanceCommand, resource));
-        containers.add(getPrometheusContainer());
         podSpec.containers(containers);
 
         // Configure secrets
@@ -607,38 +596,6 @@ class KubernetesRuntime implements Runtime {
         return container;
     }
 
-    private V1Container getPrometheusContainer() {
-        final V1Container container = new 
V1Container().name("prometheusmetricsserver");
-
-        // set up the container images
-        container.setImage(pulsarDockerImageName);
-
-        // set up the container command
-        container.setCommand(getPrometheusMetricsServerCommand());
-
-        // setup the environment variables for the container
-        final V1EnvVar envVarPodName = new V1EnvVar();
-        envVarPodName.name("POD_NAME")
-                .valueFrom(new V1EnvVarSource()
-                        .fieldRef(new V1ObjectFieldSelector()
-                                .fieldPath("metadata.name")));
-        container.setEnv(Arrays.asList(envVarPodName));
-
-
-        // set container resources
-        final V1ResourceRequirements resourceRequirements = new 
V1ResourceRequirements();
-        final Map<String, Quantity> requests = new HashMap<>();
-        requests.put("memory", 
Quantity.fromString(Long.toString(prometheusMetricsServerRam)));
-        requests.put("cpu", 
Quantity.fromString(Double.toString(prometheusMetricsServerCpu)));
-        resourceRequirements.setRequests(requests);
-        container.setResources(resourceRequirements);
-
-        // set container ports
-        container.setPorts(getPrometheusContainerPorts());
-
-        return container;
-    }
-
     private List<V1ContainerPort> getFunctionContainerPorts() {
         List<V1ContainerPort> ports = new ArrayList<>();
         final V1ContainerPort port = new V1ContainerPort();
@@ -652,7 +609,7 @@ class KubernetesRuntime implements Runtime {
         List<V1ContainerPort> ports = new ArrayList<>();
         final V1ContainerPort port = new V1ContainerPort();
         port.setName("prometheus");
-        port.setContainerPort(PROMETHEUS_PORT);
+        port.setContainerPort(METRICS_PORT);
         ports.add(port);
         return ports;
     }
@@ -680,24 +637,4 @@ class KubernetesRuntime implements Runtime {
             throw new RuntimeException("Kubernetes job name size should be 
less than " + maxJobNameSize);
         }
     }
-
-    private List<String> composePrometheusMetricsServerArgs(String 
prometheusMetricsServerFile,
-                                                            Integer 
expectedMetricsInterval) throws Exception {
-        List<String> args = new LinkedList<>();
-        args.add("java");
-        args.add("-cp");
-        args.add(prometheusMetricsServerFile);
-        
args.add("-Dlog4j.configurationFile=prometheus_metricsserver_log4j2.yml");
-        args.add("-Xmx" + String.valueOf(prometheusMetricsServerRam));
-        args.add(PrometheusMetricsServer.class.getName());
-        args.add("--function_details");
-        args.add("'" + 
JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails())
 + "'");
-        args.add("--prometheus_port");
-        args.add(String.valueOf(PROMETHEUS_PORT));
-        args.add("--grpc_port");
-        args.add(String.valueOf(GRPC_PORT));
-        args.add("--collection_interval");
-        args.add(String.valueOf(expectedMetricsInterval));
-        return args;
-    }
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index 958dbcf..c198cc3 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -193,6 +193,7 @@ public class LocalRunner {
                 instanceConfig.setInstanceId(i + instanceIdOffset);
                 instanceConfig.setMaxBufferedTuples(1024);
                 instanceConfig.setPort(Utils.findAvailablePort());
+                instanceConfig.setClusterName("local");
                 RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
                         instanceConfig,
                         userCodeFile,
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index d06e90f..e39c7ad 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -36,6 +36,7 @@ import 
org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Utils;
 
 import java.io.InputStream;
 import java.util.List;
@@ -117,7 +118,8 @@ class ProcessRuntime implements Runtime {
             secretsProviderConfig,
             false,
             null,
-            null);
+            null,
+                Utils.findAvailablePort());
     }
 
     /**
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index e84ec20..05f21e6 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -57,7 +57,8 @@ class RuntimeUtils {
                                            String secretsProviderConfig,
                                            Boolean 
installUserCodeDepdendencies,
                                            String pythonDependencyRepository,
-                                           String 
pythonExtraDependencyRepository) throws Exception {
+                                           String 
pythonExtraDependencyRepository,
+                                           int metricsPort) throws Exception {
         List<String> args = new LinkedList<>();
         if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
             args.add("java");
@@ -160,6 +161,9 @@ class RuntimeUtils {
         args.add("--port");
         args.add(String.valueOf(grpcPort));
 
+        args.add("--metrics_port");
+        args.add(String.valueOf(metricsPort));
+
         // state storage configs
         if (null != stateStorageServiceUrl
                 && instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
@@ -177,6 +181,9 @@ class RuntimeUtils {
                 args.add("'" + secretsProviderConfig + "'");
             }
         }
+
+        args.add("--cluster_name");
+        args.add(instanceConfig.getClusterName());
         return args;
     }
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 5e42c52..913d4ae 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime;
 
 import java.util.concurrent.CompletableFuture;
 
+import io.prometheus.client.CollectorRegistry;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -53,18 +54,28 @@ class ThreadRuntime implements Runtime {
                   String jarFile,
                   PulsarClient pulsarClient,
                   String stateStorageServiceUrl,
-                  SecretsProvider secretsProvider) {
+                  SecretsProvider secretsProvider,
+                  CollectorRegistry collectorRegistry) {
         this.instanceConfig = instanceConfig;
         if (instanceConfig.getFunctionDetails().getRuntime() != 
Function.FunctionDetails.Runtime.JAVA) {
             throw new RuntimeException("Thread Container only supports Java 
Runtime");
         }
+
+        // if collector registry is not set, create one for this thread.
+        // since each thread / instance will needs its own collector registry 
for metrics collection
+        CollectorRegistry instanceCollectorRegistry = collectorRegistry;
+        if (instanceCollectorRegistry == null) {
+            instanceCollectorRegistry = new CollectorRegistry();
+        }
+
         this.javaInstanceRunnable = new JavaInstanceRunnable(
             instanceConfig,
             fnCache,
             jarFile,
             pulsarClient,
             stateStorageServiceUrl,
-            secretsProvider);
+            secretsProvider,
+            instanceCollectorRegistry);
         this.threadGroup = threadGroup;
     }
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index 846028d..ed76117 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import io.prometheus.client.CollectorRegistry;
 import lombok.extern.slf4j.Slf4j;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -45,21 +46,23 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
     private final PulsarClient pulsarClient;
     private final String storageServiceUrl;
     private final SecretsProvider secretsProvider;
+    private final CollectorRegistry collectorRegistry;
     private volatile boolean closed;
 
     public ThreadRuntimeFactory(String threadGroupName, String 
pulsarServiceUrl, String storageServiceUrl,
-                                AuthenticationConfig authConfig, 
SecretsProvider secretsProvider) throws Exception {
-        this(threadGroupName, createPulsarClient(pulsarServiceUrl, 
authConfig), storageServiceUrl, secretsProvider);
+                                AuthenticationConfig authConfig, 
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry) throws 
Exception {
+        this(threadGroupName, createPulsarClient(pulsarServiceUrl, 
authConfig), storageServiceUrl, secretsProvider, collectorRegistry);
     }
 
     @VisibleForTesting
     public ThreadRuntimeFactory(String threadGroupName, PulsarClient 
pulsarClient, String storageServiceUrl,
-                                SecretsProvider secretsProvider) {
+                                SecretsProvider secretsProvider, 
CollectorRegistry collectorRegistry) {
         this.secretsProvider = secretsProvider;
         this.fnCache = new FunctionCacheManagerImpl();
         this.threadGroup = new ThreadGroup(threadGroupName);
         this.pulsarClient = pulsarClient;
         this.storageServiceUrl = storageServiceUrl;
+        this.collectorRegistry = collectorRegistry;
     }
 
     private static PulsarClient createPulsarClient(String pulsarServiceUrl, 
AuthenticationConfig authConfig)
@@ -94,7 +97,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
             jarFile,
             pulsarClient,
             storageServiceUrl,
-            secretsProvider);
+            secretsProvider,
+            collectorRegistry);
     }
 
     @Override
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 2e00983..c22970e 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -192,6 +192,7 @@ public class KubernetesRuntimeTest {
         config.setFunctionVersion("1.0");
         config.setInstanceId(0);
         config.setMaxBufferedTuples(1024);
+        config.setClusterName("standalone");
 
         return config;
     }
@@ -233,16 +234,19 @@ public class KubernetesRuntimeTest {
         String classpath = javaInstanceJarFile;
         String extraDepsEnv;
         int portArg;
+        int metricsPortArg;
         int totalArgs;
         if (null != depsDir) {
             extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + 
depsDir;
             classpath = classpath + ":" + depsDir + "/*";
-            totalArgs = 29;
+            totalArgs = 33;
             portArg = 24;
+            metricsPortArg = 26;
         } else {
             extraDepsEnv = "";
             portArg = 23;
-            totalArgs = 28;
+            metricsPortArg = 25;
+            totalArgs = 32;
         }
         if (secretsAttached) {
             totalArgs += 4;
@@ -263,13 +267,15 @@ public class KubernetesRuntimeTest {
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + 
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+                + " --max_buffered_tuples 1024 --port " + args.get(portArg) + 
" --metrics_port " + args.get(metricsPortArg)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
                 + " --expected_healthcheck_interval -1";
         if (secretsAttached) {
             expectedArgs += " --secrets_provider 
org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
                     + " --secrets_provider_config 
'{\"Somevalue\":\"myvalue\"}'";
         }
+        expectedArgs += " --cluster_name standalone";
+
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
@@ -301,15 +307,18 @@ public class KubernetesRuntimeTest {
         int portArg;
         String pythonPath;
         int configArg;
+        int metricsPortArg;
         if (null == extraDepsDir) {
-            totalArgs = 32;
+            totalArgs = 36;
             portArg = 29;
             configArg = 9;
             pythonPath = "";
+            metricsPortArg = 31;
         } else {
-            totalArgs = 33;
+            totalArgs = 37;
             portArg = 30;
             configArg = 10;
+            metricsPortArg = 32;
             pythonPath = "PYTHONPATH=${PYTHONPATH}:" + extraDepsDir + " ";
         }
         if (secretsAttached) {
@@ -331,12 +340,13 @@ public class KubernetesRuntimeTest {
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + 
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+                + " --max_buffered_tuples 1024 --port " + args.get(portArg) + 
" --metrics_port " + args.get(metricsPortArg)
                 + " --expected_healthcheck_interval -1";
         if (secretsAttached) {
             expectedArgs += " --secrets_provider 
secretsprovider.ClearTextSecretsProvider"
                     + " --secrets_provider_config 
'{\"Somevalue\":\"myvalue\"}'";
         }
+        expectedArgs += " --cluster_name standalone";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 63875fd..323943c 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -172,6 +172,7 @@ public class ProcessRuntimeTest {
         config.setFunctionVersion("1.0");
         config.setInstanceId(0);
         config.setMaxBufferedTuples(1024);
+        config.setClusterName("standalone");
 
         return config;
     }
@@ -250,15 +251,18 @@ public class ProcessRuntimeTest {
         String classpath = javaInstanceJarFile;
         String extraDepsEnv;
         int portArg;
+        int metricsPortArg;
         if (null != depsDir) {
-            assertEquals(args.size(), 33);
+            assertEquals(args.size(), 37);
             extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + 
depsDir.toString();
             classpath = classpath + ":" + depsDir + "/*";
             portArg = 24;
+            metricsPortArg = 26;
         } else {
-            assertEquals(args.size(), 32);
+            assertEquals(args.size(), 36);
             extraDepsEnv = "";
             portArg = 23;
+            metricsPortArg = 25;
         }
 
         String expectedArgs = "java -cp " + classpath
@@ -273,11 +277,12 @@ public class ProcessRuntimeTest {
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + 
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+                + " --max_buffered_tuples 1024 --port " + args.get(portArg) + 
" --metrics_port " + args.get(metricsPortArg)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
                 + " --expected_healthcheck_interval 30"
                 + " --secrets_provider 
org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
-                + " --secrets_provider_config '{\"Config\":\"Value\"}'";
+                + " --secrets_provider_config '{\"Config\":\"Value\"}'"
+                + " --cluster_name standalone";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
@@ -305,8 +310,9 @@ public class ProcessRuntimeTest {
         ProcessRuntime container = factory.createContainer(config, 
userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
 
-        int totalArgs = 30;
+        int totalArgs = 34;
         int portArg = 23;
+        int metricsPortArg = 25;
         String pythonPath = "";
         int configArg = 9;
 
@@ -319,10 +325,11 @@ public class ProcessRuntimeTest {
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + 
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+                + " --max_buffered_tuples 1024 --port " + args.get(portArg) + 
" --metrics_port " + args.get(metricsPortArg)
                 + " --expected_healthcheck_interval 30"
                 + " --secrets_provider 
secretsprovider.ClearTextSecretsProvider"
-                + " --secrets_provider_config '{\"Config\":\"Value\"}'";
+                + " --secrets_provider_config '{\"Config\":\"Value\"}'"
+                + " --cluster_name standalone";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 4f8d04a..46343a5 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -172,6 +172,7 @@ public class FunctionActioner implements AutoCloseable {
         instanceConfig.setInstanceId(instanceId);
         instanceConfig.setMaxBufferedTuples(1024);
         
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
+        
instanceConfig.setClusterName(workerConfig.getPulsarFunctionsCluster());
 
         log.info("{}/{}/{}-{} start process with instance config {}", 
functionDetails.getTenant(), functionDetails.getNamespace(),
                 functionDetails.getName(), instanceId, instanceConfig);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 25b97ce..8a8fafa 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -36,6 +36,7 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 
+import io.prometheus.client.CollectorRegistry;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -130,7 +131,8 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                     workerConfig.getPulsarServiceUrl(),
                     workerConfig.getStateStorageServiceUrl(),
                     authConfig,
-                    new ClearTextSecretsProvider());
+                    new ClearTextSecretsProvider(),
+                     null);
         } else if (workerConfig.getProcessContainerFactory() != null) {
             this.runtimeFactory = new ProcessRuntimeFactory(
                     workerConfig.getPulsarServiceUrl(),
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index e497ff3..d2f1504 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -73,13 +73,13 @@ public class FunctionsStatsGenerator {
                                 int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
                                 String qualifiedNamespace = 
String.format("%s/%s", tenant, namespace);
 
-                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%scount", metricName),
+                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%s_count", metricName),
                                         instanceId, dataDigest.getCount());
-                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%smax", metricName),
+                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%s_max", metricName),
                                         instanceId, dataDigest.getMax());
-                                metric(out, cluster, qualifiedNamespace,name, 
String.format("%smin", metricName),
+                                metric(out, cluster, qualifiedNamespace,name, 
String.format("%s_min", metricName),
                                         instanceId, dataDigest.getMin());
-                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%ssum", metricName),
+                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%s_sum", metricName),
                                         instanceId, dataDigest.getSum());
 
                             }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index e35aa2b..c957de7 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -87,10 +87,10 @@ public class FunctionStatsGeneratorTest {
         CompletableFuture<InstanceCommunication.MetricsData> 
metricsDataCompletableFuture = new CompletableFuture<>();
         InstanceCommunication.MetricsData metricsData = 
InstanceCommunication.MetricsData.newBuilder()
                 .putMetrics(
-                        "__function_total_processed__",
+                        "pulsar_function_processed_total",
                         
InstanceCommunication.MetricsData.DataDigest.newBuilder()
                                 
.setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build())
-                .putMetrics("__function_process_latency_ms__",
+                .putMetrics("pulsar_function_process_latency_ms",
                         
InstanceCommunication.MetricsData.DataDigest.newBuilder()
                                 
.setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build())
                 .build();
@@ -127,56 +127,56 @@ public class FunctionStatsGeneratorTest {
         Assert.assertEquals(metrics.size(), 8);
 
         System.out.println("metrics: " + metrics);
-        Metric m = metrics.get("__function_total_processed__count");
+        Metric m = metrics.get("pulsar_function_processed_total_count");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 100.0);
 
-        m = metrics.get("__function_total_processed__max");
+        m = metrics.get("pulsar_function_processed_total_max");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 200.0);
 
-        m = metrics.get("__function_total_processed__sum");
+        m = metrics.get("pulsar_function_processed_total_sum");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 300.0);
 
-        m = metrics.get("__function_total_processed__min");
+        m = metrics.get("pulsar_function_processed_total_min");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 0.0);
 
-        m = metrics.get("__function_process_latency_ms__count");
+        m = metrics.get("pulsar_function_process_latency_ms_count");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 10.0);
 
-        m = metrics.get("__function_process_latency_ms__max");
+        m = metrics.get("pulsar_function_process_latency_ms_max");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 20.0);
 
-        m = metrics.get("__function_process_latency_ms__sum");
+        m = metrics.get("pulsar_function_process_latency_ms_sum");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 30.0);
 
-        m = metrics.get("__function_process_latency_ms__min");
+        m = metrics.get("pulsar_function_process_latency_ms_min");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 066ca81..ffd1bd6 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.worker;
 import com.google.common.collect.Sets;
 import com.google.protobuf.InvalidProtocolBufferException;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import io.prometheus.client.CollectorRegistry;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
@@ -128,7 +129,7 @@ public class SchedulerManagerTest {
     public void stop() {
         this.executor.shutdown();
     }
-    
+
     @Test
     public void testSchedule() throws Exception {
 
@@ -141,7 +142,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -187,7 +188,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -234,7 +235,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -294,7 +295,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -359,7 +360,8 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider
+                (), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -470,7 +472,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -597,7 +599,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new 
HashMap<>();
@@ -651,7 +653,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -784,7 +786,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, 
"dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments

Reply via email to