This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e0f3cc1 Export Function worker internal stats via Prometheus (#7641)
e0f3cc1 is described below
commit e0f3cc1507b92700dece75c13fd97c0cfc2658a2
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Fri Jul 24 13:19:58 2020 -0700
Export Function worker internal stats via Prometheus (#7641)
Co-authored-by: Jerry Peng <[email protected]>
---
pulsar-functions/worker/pom.xml | 5 +
.../functions/worker/FunctionRuntimeManager.java | 18 +-
.../functions/worker/FunctionsStatsGenerator.java | 12 +
.../pulsar/functions/worker/SchedulerManager.java | 22 +-
.../pulsar/functions/worker/WorkerService.java | 11 +-
.../functions/worker/WorkerStatsManager.java | 250 +++++++++++++++++++++
.../pulsar/functions/worker/rest/WorkerServer.java | 38 ++--
.../worker/FunctionRuntimeManagerTest.java | 13 ++
.../functions/worker/MembershipManagerTest.java | 4 +
.../functions/worker/SchedulerManagerTest.java | 3 +-
10 files changed, 353 insertions(+), 23 deletions(-)
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 58589e9..78b33e2 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -144,6 +144,11 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_jetty</artifactId>
+ </dependency>
+
<!-- functions related dependencies (end) -->
<dependency>
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 48c63af..852a2b2 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -141,11 +141,13 @@ public class FunctionRuntimeManager implements
AutoCloseable{
private final FunctionMetaDataManager functionMetaDataManager;
+ private final WorkerStatsManager workerStatsManager;
+
private final ErrorNotifier errorNotifier;
public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService
workerService, Namespace dlogNamespace,
MembershipManager membershipManager,
ConnectorsManager connectorsManager, FunctionsManager functionsManager,
- FunctionMetaDataManager
functionMetaDataManager, ErrorNotifier errorNotifier) throws Exception {
+ FunctionMetaDataManager
functionMetaDataManager, WorkerStatsManager workerStatsManager, ErrorNotifier
errorNotifier) throws Exception {
this.workerConfig = workerConfig;
this.workerService = workerService;
this.functionAdmin = workerService.getFunctionAdmin();
@@ -213,6 +215,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
this.membershipManager = membershipManager;
this.functionMetaDataManager = functionMetaDataManager;
+ this.workerStatsManager = workerStatsManager;
this.errorNotifier = errorNotifier;
}
@@ -920,19 +923,32 @@ public class FunctionRuntimeManager implements
AutoCloseable{
private void conditionallyStartFunction(FunctionRuntimeInfo
functionRuntimeInfo) {
if (!this.isInitializePhase) {
+ workerStatsManager.startInstanceProcessTimeStart();
this.functionActioner.startFunction(functionRuntimeInfo);
+ workerStatsManager.startInstanceProcessTimeEnd();
}
}
private void conditionallyStopFunction(FunctionRuntimeInfo
functionRuntimeInfo) {
if (!this.isInitializePhase) {
+ workerStatsManager.stopInstanceProcessTimeStart();
this.functionActioner.stopFunction(functionRuntimeInfo);
+ workerStatsManager.stopInstanceProcessTimeEnd();
}
}
private void conditionallyTerminateFunction(FunctionRuntimeInfo
functionRuntimeInfo) {
if (!this.isInitializePhase) {
+ workerStatsManager.startInstanceProcessTimeStart();
this.functionActioner.terminateFunction(functionRuntimeInfo);
+ workerStatsManager.startInstanceProcessTimeEnd();
}
}
+
+ /** Methods for metrics **/
+
+ public int getMyInstances() {
+ Map<String, Assignment> myAssignments =
workerIdToAssignments.get(workerConfig.getWorkerId());
+ return myAssignments == null ? 0 : myAssignments.size();
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 327d694..142caf6 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -38,6 +38,18 @@ public class FunctionsStatsGenerator {
public static void generate(WorkerService workerService, String cluster,
SimpleTextOutputStream out) {
// only when worker service is initialized, we generate the stats.
otherwise we will get bunch of NPE.
if (workerService != null && workerService.isInitialized()) {
+
+ /* worker internal stats */
+
+ try {
+
out.write(workerService.getWorkerStatsManager().getStatsAsString());
+ } catch (IOException e) {
+ log.warn("Encountered error when generating metrics for worker
{}",
+ workerService.getWorkerConfig().getWorkerId(), e);
+ }
+
+ /* function stats */
+
// kubernetes runtime factory doesn't support stats collection
through worker service
if (workerService.getFunctionRuntimeManager().getRuntimeFactory()
instanceof KubernetesRuntimeFactory) {
return;
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 469256b..21961cf 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -81,6 +81,7 @@ public class SchedulerManager implements AutoCloseable {
private final WorkerConfig workerConfig;
private final ErrorNotifier errorNotifier;
+ private final WorkerStatsManager workerStatsManager;
private ThreadPoolExecutor executorService;
private final PulsarClient pulsarClient;
@@ -124,14 +125,15 @@ public class SchedulerManager implements AutoCloseable {
public SchedulerManager(WorkerConfig workerConfig,
PulsarClient pulsarClient,
PulsarAdmin admin,
+ WorkerStatsManager workerStatsManager,
ErrorNotifier errorNotifier) {
this.workerConfig = workerConfig;
this.pulsarClient = pulsarClient;
this.admin = admin;
this.scheduler =
Reflections.createInstance(workerConfig.getSchedulerClassName(),
IScheduler.class,
Thread.currentThread().getContextClassLoader());
+ this.workerStatsManager = workerStatsManager;
this.errorNotifier = errorNotifier;
-
}
private static Producer<byte[]> createProducer(PulsarClient client,
WorkerConfig config) {
@@ -225,11 +227,19 @@ public class SchedulerManager implements AutoCloseable {
}
public Future<?> schedule() {
- return scheduleInternal(() -> invokeScheduler(), "Encountered error
when invoking scheduler");
+ return scheduleInternal(() -> {
+ workerStatsManager.scheduleTotalExecTimeStart();
+ invokeScheduler();
+ workerStatsManager.scheduleTotalExecTimeEnd();
+ }, "Encountered error when invoking scheduler");
}
private Future<?> rebalance() {
- return scheduleInternal(() -> invokeRebalance(), "Encountered error
when invoking rebalance");
+ return scheduleInternal(() -> {
+ workerStatsManager.rebalanceTotalExecTimeStart();
+ invokeRebalance();
+ workerStatsManager.rebalanceTotalExecTimeEnd();
+ }, "Encountered error when invoking rebalance");
}
public Future<?> rebalanceIfNotInprogress() {
@@ -325,7 +335,10 @@ public class SchedulerManager implements AutoCloseable {
Pair<List<Function.Instance>, List<Assignment>> unassignedInstances
= getUnassignedFunctionInstances(workerIdToAssignments,
allInstances);
+ workerStatsManager.scheduleStrategyExecTimeStartStart();
List<Assignment> assignments =
scheduler.schedule(unassignedInstances.getLeft(), currentAssignments,
currentMembership);
+ workerStatsManager.scheduleStrategyExecTimeStartEnd();
+
assignments.addAll(unassignedInstances.getRight());
if (log.isDebugEnabled()) {
@@ -379,8 +392,9 @@ public class SchedulerManager implements AutoCloseable {
.flatMap(stringMapEntry ->
stringMapEntry.getValue().values().stream())
.collect(Collectors.toList());
-
+ workerStatsManager.rebalanceStrategyExecTimeStart();
List<Assignment> rebalancedAssignments =
scheduler.rebalance(currentAssignments, currentMembership);
+ workerStatsManager.rebalanceStrategyExecTimeEnd();
for (Assignment assignment : rebalancedAssignments) {
MessageId messageId = publishNewAssignment(assignment, false);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 5b493bc..94d3fbd 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -74,19 +74,22 @@ public class WorkerService {
private URI dlogUri;
private LeaderService leaderService;
private FunctionAssignmentTailer functionAssignmentTailer;
+ private final WorkerStatsManager workerStatsManager;
public WorkerService(WorkerConfig workerConfig) {
this.workerConfig = workerConfig;
this.statsUpdater = Executors
.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("worker-stats-updater"));
this.metricsGenerator = new MetricsGenerator(this.statsUpdater,
workerConfig);
+ workerStatsManager = new WorkerStatsManager(workerConfig);
}
-
public void start(URI dlogUri,
AuthenticationService authenticationService,
AuthorizationService authorizationService,
ErrorNotifier errorNotifier) throws InterruptedException
{
+
+ workerStatsManager.startupTimeStart();
log.info("/** Starting worker id={} **/", workerConfig.getWorkerId());
try {
@@ -163,7 +166,7 @@ public class WorkerService {
brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getClusterCoordinationTopic());
brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionMetadataTopic());
//create scheduler manager
- this.schedulerManager = new SchedulerManager(workerConfig, client,
brokerAdmin, errorNotifier);
+ this.schedulerManager = new SchedulerManager(workerConfig, client,
brokerAdmin, workerStatsManager, errorNotifier);
//create function meta data manager
this.functionMetaDataManager = new FunctionMetaDataManager(
@@ -188,6 +191,7 @@ public class WorkerService {
connectorsManager,
functionsManager,
functionMetaDataManager,
+ workerStatsManager,
errorNotifier);
@@ -278,6 +282,9 @@ public class WorkerService {
this.isInitialized = true;
log.info("/** Started worker id={} **/",
workerConfig.getWorkerId());
+
+
workerStatsManager.setFunctionRuntimeManager(functionRuntimeManager);
+ workerStatsManager.startupTimeEnd();
} catch (Throwable t) {
log.error("Error Starting up in worker", t);
throw new RuntimeException(t);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
new file mode 100644
index 0000000..a14039e
--- /dev/null
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.worker;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import lombok.Setter;
+import org.apache.pulsar.functions.instance.stats.PrometheusTextFormat;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+public class WorkerStatsManager {
+
+ private static final String PULSAR_FUNCTION_WORKER_METRICS_PREFIX =
"pulsar_function_worker_";
+ private static final String START_UP_TIME = "start_up_time_ms";
+ private static final String INSTANCE_COUNT = "instance_count";
+ private static final String SCHEDULE_TOTAL_EXEC_TIME =
"schedule_execution_time_total_ms";
+ private static final String SCHEDULE_STRATEGY_EXEC_TIME =
"schedule_strategy_execution_time_ms";
+ private static final String REBALANCE_TOTAL_EXEC_TIME =
"rebalance_execution_time_total_ms";
+ private static final String REBALANCE_STRATEGY_EXEC_TIME =
"rebalance_strategy_execution_time_ms";
+ private static final String STOPPING_INSTANCE_PROCESS_TIME =
"stop_instance_process_time_ms";
+ private static final String UPDATING_INSTANCE_PROCESS_TIME =
"update_instance_process_time_ms";
+ private static final String STARTING_INSTANCE_PROCESS_TIME =
"start_instance_process_time_ms";
+
+ private static final String[] metricsLabelNames = {"cluster"};
+ private final String[] metricsLabels;
+
+ @Setter
+ private FunctionRuntimeManager functionRuntimeManager;
+
+ private CollectorRegistry collectorRegistry = new CollectorRegistry();
+
+ private final Summary statWorkerStartupTime;
+ private final Gauge statNumInstances;
+ private final Summary scheduleTotalExecutionTime;
+ private final Summary scheduleStrategyExecutionTime;
+ private final Summary rebalanceTotalExecutionTime;
+ private final Summary rebalanceStrategyExecutionTime;
+ private final Summary stopInstanceProcessTime;
+ private final Summary startInstanceProcessTime;
+
+ // As an optimization
+ private final Summary.Child _statWorkerStartupTime;
+ private final Gauge.Child _statNumInstances;
+ private final Summary.Child _scheduleTotalExecutionTime;
+ private final Summary.Child _scheduleStrategyExecutionTime;
+ private final Summary.Child _rebalanceTotalExecutionTime;
+ private final Summary.Child _rebalanceStrategyExecutionTime;
+ private final Summary.Child _stopInstanceProcessTime;
+ private final Summary.Child _startInstanceProcessTime;
+
+ public WorkerStatsManager(WorkerConfig workerConfig) {
+
+ metricsLabels = new String[]{workerConfig.getPulsarFunctionsCluster()};
+
+ statWorkerStartupTime = Summary.build()
+ .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX + START_UP_TIME)
+ .help("Worker service startup time in milliseconds.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statWorkerStartupTime = statWorkerStartupTime.labels(metricsLabels);
+
+ statNumInstances = Gauge.build()
+ .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX + INSTANCE_COUNT)
+ .help("Number of instances run by this worker.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statNumInstances = statNumInstances.labels(metricsLabels);
+
+ scheduleTotalExecutionTime = Summary.build()
+ .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX + SCHEDULE_TOTAL_EXEC_TIME)
+ .help("Total execution time of schedule in milliseconds.")
+ .labelNames(metricsLabelNames)
+ .quantile(0.5, 0.01)
+ .quantile(0.9, 0.01)
+ .quantile(0.99, 0.01)
+ .quantile(0.999, 0.01)
+ .register(collectorRegistry);
+ _scheduleTotalExecutionTime =
scheduleTotalExecutionTime.labels(metricsLabels);
+
+ scheduleStrategyExecutionTime = Summary.build()
+ .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX +
SCHEDULE_STRATEGY_EXEC_TIME)
+ .help("Execution time of schedule strategy in milliseconds.")
+ .labelNames(metricsLabelNames)
+ .quantile(0.5, 0.01)
+ .quantile(0.9, 0.01)
+ .quantile(0.99, 0.01)
+ .quantile(0.999, 0.01)
+ .register(collectorRegistry);
+ _scheduleStrategyExecutionTime =
scheduleStrategyExecutionTime.labels(metricsLabels);
+
+ rebalanceTotalExecutionTime = Summary.build()
+ .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX + REBALANCE_TOTAL_EXEC_TIME)
+ .help("Total execution time of a rebalance in milliseconds.")
+ .labelNames(metricsLabelNames)
+ .quantile(0.5, 0.01)
+ .quantile(0.9, 0.01)
+ .quantile(0.99, 0.01)
+ .quantile(0.999, 0.01)
+ .register(collectorRegistry);
+ _rebalanceTotalExecutionTime =
rebalanceTotalExecutionTime.labels(metricsLabels);
+
+ rebalanceStrategyExecutionTime = Summary.build()
+ .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX +
REBALANCE_STRATEGY_EXEC_TIME)
+ .help("Execution time of rebalance strategy in milliseconds.")
+ .labelNames(metricsLabelNames)
+ .quantile(0.5, 0.01)
+ .quantile(0.9, 0.01)
+ .quantile(0.99, 0.01)
+ .quantile(0.999, 0.01)
+ .register(collectorRegistry);
+ _rebalanceStrategyExecutionTime =
rebalanceStrategyExecutionTime.labels(metricsLabels);
+
+ stopInstanceProcessTime = Summary.build()
+ .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX +
STOPPING_INSTANCE_PROCESS_TIME)
+ .help("Stopping instance process time in milliseconds.")
+ .labelNames(metricsLabelNames)
+ .quantile(0.5, 0.01)
+ .quantile(0.9, 0.01)
+ .quantile(0.99, 0.01)
+ .quantile(0.999, 0.01)
+ .register(collectorRegistry);
+ _stopInstanceProcessTime = stopInstanceProcessTime.labels(metricsLabels);
+
+ startInstanceProcessTime = Summary.build()
+ .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX +
STARTING_INSTANCE_PROCESS_TIME)
+ .help("Starting instance process time in milliseconds.")
+ .labelNames(metricsLabelNames)
+ .quantile(0.5, 0.01)
+ .quantile(0.9, 0.01)
+ .quantile(0.99, 0.01)
+ .quantile(0.999, 0.01)
+ .register(collectorRegistry);
+ _startInstanceProcessTime = startInstanceProcessTime.labels(metricsLabels);
+ }
+
+ private Long startupTimeStart;
+ public void startupTimeStart() {
+ startupTimeStart = System.nanoTime();
+ }
+
+ public void startupTimeEnd() {
+ if (startupTimeStart != null) {
+ double endTimeMs = ((double) System.nanoTime() - startupTimeStart) /
1.0E6D;
+ _statWorkerStartupTime.observe(endTimeMs);
+ }
+ }
+
+ private Long scheduleTotalExecTimeStart;
+ public void scheduleTotalExecTimeStart() {
+ scheduleTotalExecTimeStart = System.nanoTime();
+ }
+
+ public void scheduleTotalExecTimeEnd() {
+ if (scheduleTotalExecTimeStart != null) {
+ double endTimeMs = ((double) System.nanoTime() -
scheduleTotalExecTimeStart) / 1.0E6D;
+ _scheduleTotalExecutionTime.observe(endTimeMs);
+ }
+ }
+
+ private Long scheduleStrategyExecTimeStart;
+ public void scheduleStrategyExecTimeStartStart() {
+ scheduleStrategyExecTimeStart = System.nanoTime();
+ }
+
+ public void scheduleStrategyExecTimeStartEnd() {
+ if (scheduleStrategyExecTimeStart != null) {
+ double endTimeMs = ((double) System.nanoTime() -
scheduleStrategyExecTimeStart) / 1.0E6D;
+ _scheduleStrategyExecutionTime.observe(endTimeMs);
+ }
+ }
+
+ private Long rebalanceTotalExecTimeStart;
+ public void rebalanceTotalExecTimeStart() {
+ rebalanceTotalExecTimeStart = System.nanoTime();
+ }
+
+ public void rebalanceTotalExecTimeEnd() {
+ if (rebalanceTotalExecTimeStart != null) {
+ double endTimeMs = ((double) System.nanoTime() -
rebalanceTotalExecTimeStart) / 1.0E6D;
+ _rebalanceTotalExecutionTime.observe(endTimeMs);
+ }
+ }
+
+ private Long rebalanceStrategyExecTimeStart;
+ public void rebalanceStrategyExecTimeStart() {
+ rebalanceStrategyExecTimeStart = System.nanoTime();
+ }
+
+ public void rebalanceStrategyExecTimeEnd() {
+ if (rebalanceStrategyExecTimeStart != null) {
+ double endTimeMs = ((double) System.nanoTime() -
rebalanceStrategyExecTimeStart) / 1.0E6D;
+ _rebalanceStrategyExecutionTime.observe(endTimeMs);
+ }
+ }
+
+ private Long stopInstanceProcessTimeStart;
+ public void stopInstanceProcessTimeStart() {
+ stopInstanceProcessTimeStart = System.nanoTime();
+ }
+
+ public void stopInstanceProcessTimeEnd() {
+ if (stopInstanceProcessTimeStart != null) {
+ double endTimeMs = ((double) System.nanoTime() -
stopInstanceProcessTimeStart) / 1.0E6D;
+ _stopInstanceProcessTime.observe(endTimeMs);
+ }
+ }
+
+ private Long startInstanceProcessTimeStart;
+ public void startInstanceProcessTimeStart() {
+ startInstanceProcessTimeStart = System.nanoTime();
+ }
+
+ public void startInstanceProcessTimeEnd() {
+ if (startInstanceProcessTimeStart != null) {
+ double endTimeMs = ((double) System.nanoTime() -
startInstanceProcessTimeStart) / 1.0E6D;
+ _startInstanceProcessTime.observe(endTimeMs);
+ }
+ }
+
+ public String getStatsAsString() throws IOException {
+
+ _statNumInstances.set(functionRuntimeManager.getMyInstances());
+
+ StringWriter outputWriter = new StringWriter();
+
+ PrometheusTextFormat.write004(outputWriter,
collectorRegistry.metricFamilySamples());
+
+ return outputWriter.toString();
+ }
+}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 56ee46c..fe8b810 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -19,21 +19,8 @@
package org.apache.pulsar.functions.worker.rest;
import com.google.common.annotations.VisibleForTesting;
-
-import java.net.BindException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.TimeZone;
-
-import javax.servlet.DispatcherType;
-
-import io.prometheus.client.exporter.MetricsServlet;
+import io.prometheus.client.jetty.JettyStatisticsCollector;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.common.util.SecurityUtility;
@@ -49,6 +36,7 @@ import
org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
+import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
@@ -56,6 +44,16 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
+import java.net.BindException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+
+import javax.servlet.DispatcherType;
+
@Slf4j
public class WorkerServer {
@@ -121,7 +119,17 @@ public class WorkerServer {
contexts.setHandlers(handlers.toArray(new Handler[handlers.size()]));
HandlerCollection handlerCollection = new HandlerCollection();
handlerCollection.setHandlers(new Handler[] { contexts, new
DefaultHandler(), requestLogHandler });
- server.setHandler(handlerCollection);
+
+ // Metrics handler
+ StatisticsHandler stats = new StatisticsHandler();
+ stats.setHandler(handlerCollection);
+ try {
+ new JettyStatisticsCollector(stats).register();
+ } catch (IllegalArgumentException e) {
+ // Already registered. Eg: in unit tests
+ }
+ handlers.add(stats);
+ server.setHandler(stats);
if (this.workerConfig.getWorkerPortTls() != null) {
try {
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index c7e8e41..a3b39ed 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -104,6 +104,7 @@ public class FunctionRuntimeManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));
FunctionActioner functionActioner =
spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -188,6 +189,7 @@ public class FunctionRuntimeManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));
FunctionActioner functionActioner =
spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -275,6 +277,7 @@ public class FunctionRuntimeManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
FunctionActioner functionActioner =
spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -406,6 +409,7 @@ public class FunctionRuntimeManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
FunctionActioner functionActioner =
spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -593,6 +597,7 @@ public class FunctionRuntimeManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
errorNotifier);
FunctionActioner functionActioner =
spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -665,6 +670,7 @@ public class FunctionRuntimeManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
@@ -770,6 +776,7 @@ public class FunctionRuntimeManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
fail();
@@ -795,6 +802,7 @@ public class FunctionRuntimeManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
fail();
@@ -820,6 +828,7 @@ public class FunctionRuntimeManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
fail();
@@ -845,6 +854,7 @@ public class FunctionRuntimeManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(),
ThreadRuntimeFactory.class);
@@ -875,6 +885,7 @@ public class FunctionRuntimeManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(),
KubernetesRuntimeFactory.class);
@@ -904,6 +915,7 @@ public class FunctionRuntimeManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(),
ProcessRuntimeFactory.class);
@@ -929,6 +941,7 @@ public class FunctionRuntimeManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(),
ThreadRuntimeFactory.class);
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 71fe6b2..50012be 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -113,6 +113,7 @@ public class MembershipManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
functionMetaDataManager,
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));
MembershipManager membershipManager = spy(new
MembershipManager(workerService, pulsarClient, pulsarAdmin));
@@ -187,6 +188,7 @@ public class MembershipManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
functionMetaDataManager,
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));
MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
@@ -276,6 +278,7 @@ public class MembershipManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
functionMetaDataManager,
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));
MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
@@ -357,6 +360,7 @@ public class MembershipManagerTest {
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
functionMetaDataManager,
+ mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));
MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index ab59cfd..66174fb 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -127,7 +127,8 @@ public class SchedulerManagerTest {
this.executor = Executors
.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("worker-test"));
errorNotifier = spy(ErrorNotifier.getDefaultImpl());
- schedulerManager = spy(new SchedulerManager(workerConfig,
pulsarClient, null, errorNotifier));
+ schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient,
+ null, mock(WorkerStatsManager.class), errorNotifier));
functionRuntimeManager = mock(FunctionRuntimeManager.class);
functionMetaDataManager = mock(FunctionMetaDataManager.class);
membershipManager = mock(MembershipManager.class);