This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e0f3cc1  Export Function worker internal stats via Prometheus (#7641)
e0f3cc1 is described below

commit e0f3cc1507b92700dece75c13fd97c0cfc2658a2
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Fri Jul 24 13:19:58 2020 -0700

    Export Function worker internal stats via Prometheus (#7641)
    
    Co-authored-by: Jerry Peng <[email protected]>
---
 pulsar-functions/worker/pom.xml                    |   5 +
 .../functions/worker/FunctionRuntimeManager.java   |  18 +-
 .../functions/worker/FunctionsStatsGenerator.java  |  12 +
 .../pulsar/functions/worker/SchedulerManager.java  |  22 +-
 .../pulsar/functions/worker/WorkerService.java     |  11 +-
 .../functions/worker/WorkerStatsManager.java       | 250 +++++++++++++++++++++
 .../pulsar/functions/worker/rest/WorkerServer.java |  38 ++--
 .../worker/FunctionRuntimeManagerTest.java         |  13 ++
 .../functions/worker/MembershipManagerTest.java    |   4 +
 .../functions/worker/SchedulerManagerTest.java     |   3 +-
 10 files changed, 353 insertions(+), 23 deletions(-)

diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 58589e9..78b33e2 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -144,6 +144,11 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>io.prometheus</groupId>
+      <artifactId>simpleclient_jetty</artifactId>
+    </dependency>
+
     <!-- functions related dependencies (end) -->
 
     <dependency>
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 48c63af..852a2b2 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -141,11 +141,13 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
     private final FunctionMetaDataManager functionMetaDataManager;
 
+    private final WorkerStatsManager workerStatsManager;
+
     private final ErrorNotifier errorNotifier;
 
     public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService 
workerService, Namespace dlogNamespace,
                                   MembershipManager membershipManager, 
ConnectorsManager connectorsManager, FunctionsManager functionsManager,
-                                  FunctionMetaDataManager 
functionMetaDataManager, ErrorNotifier errorNotifier) throws Exception {
+                                  FunctionMetaDataManager 
functionMetaDataManager, WorkerStatsManager workerStatsManager, ErrorNotifier 
errorNotifier) throws Exception {
         this.workerConfig = workerConfig;
         this.workerService = workerService;
         this.functionAdmin = workerService.getFunctionAdmin();
@@ -213,6 +215,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
         this.membershipManager = membershipManager;
         this.functionMetaDataManager = functionMetaDataManager;
+        this.workerStatsManager = workerStatsManager;
         this.errorNotifier = errorNotifier;
     }
 
@@ -920,19 +923,32 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
     private void conditionallyStartFunction(FunctionRuntimeInfo 
functionRuntimeInfo) {
         if (!this.isInitializePhase) {
+            workerStatsManager.startInstanceProcessTimeStart();
             this.functionActioner.startFunction(functionRuntimeInfo);
+            workerStatsManager.startInstanceProcessTimeEnd();
         }
     }
 
     private void conditionallyStopFunction(FunctionRuntimeInfo 
functionRuntimeInfo) {
         if (!this.isInitializePhase) {
+            workerStatsManager.stopInstanceProcessTimeStart();
             this.functionActioner.stopFunction(functionRuntimeInfo);
+            workerStatsManager.stopInstanceProcessTimeEnd();
         }
     }
 
     private void conditionallyTerminateFunction(FunctionRuntimeInfo 
functionRuntimeInfo) {
         if (!this.isInitializePhase) {
+            workerStatsManager.startInstanceProcessTimeStart();
             this.functionActioner.terminateFunction(functionRuntimeInfo);
+            workerStatsManager.startInstanceProcessTimeEnd();
         }
     }
+
+    /** Methods for metrics **/
+
+    public int getMyInstances() {
+        Map<String, Assignment> myAssignments = 
workerIdToAssignments.get(workerConfig.getWorkerId());
+        return myAssignments == null ? 0 : myAssignments.size();
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 327d694..142caf6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -38,6 +38,18 @@ public class FunctionsStatsGenerator {
     public static void generate(WorkerService workerService, String cluster, 
SimpleTextOutputStream out) {
         // only when worker service is initialized, we generate the stats. 
otherwise we will get bunch of NPE.
         if (workerService != null && workerService.isInitialized()) {
+
+            /* worker internal stats */
+
+            try {
+                
out.write(workerService.getWorkerStatsManager().getStatsAsString());
+            } catch (IOException e) {
+                log.warn("Encountered error when generating metrics for worker 
{}",
+                  workerService.getWorkerConfig().getWorkerId(), e);
+            }
+
+            /* function stats */
+
             // kubernetes runtime factory doesn't support stats collection 
through worker service
             if (workerService.getFunctionRuntimeManager().getRuntimeFactory() 
instanceof KubernetesRuntimeFactory) {
                 return;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 469256b..21961cf 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -81,6 +81,7 @@ public class SchedulerManager implements AutoCloseable {
 
     private final WorkerConfig workerConfig;
     private final ErrorNotifier errorNotifier;
+    private final WorkerStatsManager workerStatsManager;
     private ThreadPoolExecutor executorService;
     private final PulsarClient pulsarClient;
 
@@ -124,14 +125,15 @@ public class SchedulerManager implements AutoCloseable {
     public SchedulerManager(WorkerConfig workerConfig,
                             PulsarClient pulsarClient,
                             PulsarAdmin admin,
+                            WorkerStatsManager workerStatsManager,
                             ErrorNotifier errorNotifier) {
         this.workerConfig = workerConfig;
         this.pulsarClient = pulsarClient;
         this.admin = admin;
         this.scheduler = 
Reflections.createInstance(workerConfig.getSchedulerClassName(), 
IScheduler.class,
                 Thread.currentThread().getContextClassLoader());
+        this.workerStatsManager = workerStatsManager;
         this.errorNotifier = errorNotifier;
-        
     }
 
     private static Producer<byte[]> createProducer(PulsarClient client, 
WorkerConfig config) {
@@ -225,11 +227,19 @@ public class SchedulerManager implements AutoCloseable {
     }
 
     public Future<?> schedule() {
-        return scheduleInternal(() -> invokeScheduler(), "Encountered error 
when invoking scheduler");
+        return scheduleInternal(() -> {
+            workerStatsManager.scheduleTotalExecTimeStart();
+            invokeScheduler();
+            workerStatsManager.scheduleTotalExecTimeEnd();
+        }, "Encountered error when invoking scheduler");
     }
 
     private Future<?> rebalance() {
-        return scheduleInternal(() -> invokeRebalance(), "Encountered error 
when invoking rebalance");
+        return scheduleInternal(() -> {
+            workerStatsManager.rebalanceTotalExecTimeStart();
+            invokeRebalance();
+            workerStatsManager.rebalanceTotalExecTimeEnd();
+        }, "Encountered error when invoking rebalance");
     }
 
     public Future<?> rebalanceIfNotInprogress() {
@@ -325,7 +335,10 @@ public class SchedulerManager implements AutoCloseable {
         Pair<List<Function.Instance>, List<Assignment>> unassignedInstances 
                 = getUnassignedFunctionInstances(workerIdToAssignments, 
allInstances);
 
+        workerStatsManager.scheduleStrategyExecTimeStartStart();
         List<Assignment> assignments = 
scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, 
currentMembership);
+        workerStatsManager.scheduleStrategyExecTimeStartEnd();
+
         assignments.addAll(unassignedInstances.getRight());
 
         if (log.isDebugEnabled()) {
@@ -379,8 +392,9 @@ public class SchedulerManager implements AutoCloseable {
                 .flatMap(stringMapEntry -> 
stringMapEntry.getValue().values().stream())
                 .collect(Collectors.toList());
 
-
+        workerStatsManager.rebalanceStrategyExecTimeStart();
         List<Assignment> rebalancedAssignments = 
scheduler.rebalance(currentAssignments, currentMembership);
+        workerStatsManager.rebalanceStrategyExecTimeEnd();
 
         for (Assignment assignment : rebalancedAssignments) {
             MessageId messageId = publishNewAssignment(assignment, false);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 5b493bc..94d3fbd 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -74,19 +74,22 @@ public class WorkerService {
     private URI dlogUri;
     private LeaderService leaderService;
     private FunctionAssignmentTailer functionAssignmentTailer;
+    private final WorkerStatsManager workerStatsManager;
 
     public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
         this.statsUpdater = Executors
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("worker-stats-updater"));
         this.metricsGenerator = new MetricsGenerator(this.statsUpdater, 
workerConfig);
+        workerStatsManager = new WorkerStatsManager(workerConfig);
     }
 
-
     public void start(URI dlogUri,
                       AuthenticationService authenticationService,
                       AuthorizationService authorizationService,
                       ErrorNotifier errorNotifier) throws InterruptedException 
{
+
+        workerStatsManager.startupTimeStart();
         log.info("/** Starting worker id={} **/", workerConfig.getWorkerId());
 
         try {
@@ -163,7 +166,7 @@ public class WorkerService {
             
brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getClusterCoordinationTopic());
             
brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionMetadataTopic());
             //create scheduler manager
-            this.schedulerManager = new SchedulerManager(workerConfig, client, 
brokerAdmin, errorNotifier);
+            this.schedulerManager = new SchedulerManager(workerConfig, client, 
brokerAdmin, workerStatsManager, errorNotifier);
 
             //create function meta data manager
             this.functionMetaDataManager = new FunctionMetaDataManager(
@@ -188,6 +191,7 @@ public class WorkerService {
                     connectorsManager,
                     functionsManager,
                     functionMetaDataManager,
+                    workerStatsManager,
                     errorNotifier);
 
 
@@ -278,6 +282,9 @@ public class WorkerService {
             this.isInitialized = true;
 
             log.info("/** Started worker id={} **/", 
workerConfig.getWorkerId());
+
+            
workerStatsManager.setFunctionRuntimeManager(functionRuntimeManager);
+            workerStatsManager.startupTimeEnd();
         } catch (Throwable t) {
             log.error("Error Starting up in worker", t);
             throw new RuntimeException(t);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
new file mode 100644
index 0000000..a14039e
--- /dev/null
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.worker;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import lombok.Setter;
+import org.apache.pulsar.functions.instance.stats.PrometheusTextFormat;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+public class WorkerStatsManager {
+
+  private static final String PULSAR_FUNCTION_WORKER_METRICS_PREFIX = 
"pulsar_function_worker_";
+  private static final String START_UP_TIME = "start_up_time_ms";
+  private static final String INSTANCE_COUNT = "instance_count";
+  private static final String SCHEDULE_TOTAL_EXEC_TIME = 
"schedule_execution_time_total_ms";
+  private static final String SCHEDULE_STRATEGY_EXEC_TIME = 
"schedule_strategy_execution_time_ms";
+  private static final String REBALANCE_TOTAL_EXEC_TIME = 
"rebalance_execution_time_total_ms";
+  private static final String REBALANCE_STRATEGY_EXEC_TIME = 
"rebalance_strategy_execution_time_ms";
+  private static final String STOPPING_INSTANCE_PROCESS_TIME = 
"stop_instance_process_time_ms";
+  private static final String UPDATING_INSTANCE_PROCESS_TIME = 
"update_instance_process_time_ms";
+  private static final String STARTING_INSTANCE_PROCESS_TIME = 
"start_instance_process_time_ms";
+
+  private static final String[] metricsLabelNames = {"cluster"};
+  private final String[] metricsLabels;
+
+  @Setter
+  private FunctionRuntimeManager functionRuntimeManager;
+
+  private CollectorRegistry collectorRegistry = new CollectorRegistry();
+
+  private final Summary statWorkerStartupTime;
+  private final Gauge statNumInstances;
+  private final Summary scheduleTotalExecutionTime;
+  private final Summary scheduleStrategyExecutionTime;
+  private final Summary rebalanceTotalExecutionTime;
+  private final Summary rebalanceStrategyExecutionTime;
+  private final Summary stopInstanceProcessTime;
+  private final Summary startInstanceProcessTime;
+
+  // As an optimization
+  private final Summary.Child _statWorkerStartupTime;
+  private final Gauge.Child _statNumInstances;
+  private final Summary.Child _scheduleTotalExecutionTime;
+  private final Summary.Child _scheduleStrategyExecutionTime;
+  private final Summary.Child _rebalanceTotalExecutionTime;
+  private final Summary.Child _rebalanceStrategyExecutionTime;
+  private final Summary.Child _stopInstanceProcessTime;
+  private final Summary.Child _startInstanceProcessTime;
+
+  public WorkerStatsManager(WorkerConfig workerConfig) {
+
+    metricsLabels = new String[]{workerConfig.getPulsarFunctionsCluster()};
+
+    statWorkerStartupTime = Summary.build()
+      .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX + START_UP_TIME)
+      .help("Worker service startup time in milliseconds.")
+      .labelNames(metricsLabelNames)
+      .register(collectorRegistry);
+    _statWorkerStartupTime = statWorkerStartupTime.labels(metricsLabels);
+
+    statNumInstances = Gauge.build()
+      .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX + INSTANCE_COUNT)
+      .help("Number of instances run by this worker.")
+      .labelNames(metricsLabelNames)
+      .register(collectorRegistry);
+    _statNumInstances = statNumInstances.labels(metricsLabels);
+
+    scheduleTotalExecutionTime = Summary.build()
+      .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX + SCHEDULE_TOTAL_EXEC_TIME)
+      .help("Total execution time of schedule in milliseconds.")
+      .labelNames(metricsLabelNames)
+      .quantile(0.5, 0.01)
+      .quantile(0.9, 0.01)
+      .quantile(0.99, 0.01)
+      .quantile(0.999, 0.01)
+      .register(collectorRegistry);
+    _scheduleTotalExecutionTime = 
scheduleTotalExecutionTime.labels(metricsLabels);
+
+    scheduleStrategyExecutionTime = Summary.build()
+      .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX + 
SCHEDULE_STRATEGY_EXEC_TIME)
+      .help("Execution time of schedule strategy in milliseconds.")
+      .labelNames(metricsLabelNames)
+      .quantile(0.5, 0.01)
+      .quantile(0.9, 0.01)
+      .quantile(0.99, 0.01)
+      .quantile(0.999, 0.01)
+      .register(collectorRegistry);
+    _scheduleStrategyExecutionTime = 
scheduleStrategyExecutionTime.labels(metricsLabels);
+
+    rebalanceTotalExecutionTime = Summary.build()
+      .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX + REBALANCE_TOTAL_EXEC_TIME)
+      .help("Total execution time of a rebalance in milliseconds.")
+      .labelNames(metricsLabelNames)
+      .quantile(0.5, 0.01)
+      .quantile(0.9, 0.01)
+      .quantile(0.99, 0.01)
+      .quantile(0.999, 0.01)
+      .register(collectorRegistry);
+    _rebalanceTotalExecutionTime = 
rebalanceTotalExecutionTime.labels(metricsLabels);
+
+    rebalanceStrategyExecutionTime = Summary.build()
+      .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX + 
REBALANCE_STRATEGY_EXEC_TIME)
+      .help("Execution time of rebalance strategy in milliseconds.")
+      .labelNames(metricsLabelNames)
+      .quantile(0.5, 0.01)
+      .quantile(0.9, 0.01)
+      .quantile(0.99, 0.01)
+      .quantile(0.999, 0.01)
+      .register(collectorRegistry);
+    _rebalanceStrategyExecutionTime = 
rebalanceStrategyExecutionTime.labels(metricsLabels);
+
+    stopInstanceProcessTime = Summary.build()
+      .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX + 
STOPPING_INSTANCE_PROCESS_TIME)
+      .help("Stopping instance process time in milliseconds.")
+      .labelNames(metricsLabelNames)
+      .quantile(0.5, 0.01)
+      .quantile(0.9, 0.01)
+      .quantile(0.99, 0.01)
+      .quantile(0.999, 0.01)
+      .register(collectorRegistry);
+    _stopInstanceProcessTime = stopInstanceProcessTime.labels(metricsLabels);
+
+    startInstanceProcessTime = Summary.build()
+      .name(PULSAR_FUNCTION_WORKER_METRICS_PREFIX + 
STARTING_INSTANCE_PROCESS_TIME)
+      .help("Starting instance process time in milliseconds.")
+      .labelNames(metricsLabelNames)
+      .quantile(0.5, 0.01)
+      .quantile(0.9, 0.01)
+      .quantile(0.99, 0.01)
+      .quantile(0.999, 0.01)
+      .register(collectorRegistry);
+    _startInstanceProcessTime = startInstanceProcessTime.labels(metricsLabels);
+  }
+
+  private Long startupTimeStart;
+  public void startupTimeStart() {
+    startupTimeStart = System.nanoTime();
+  }
+
+  public void startupTimeEnd() {
+    if (startupTimeStart != null) {
+      double endTimeMs = ((double) System.nanoTime() - startupTimeStart) / 
1.0E6D;
+      _statWorkerStartupTime.observe(endTimeMs);
+    }
+  }
+
+  private Long scheduleTotalExecTimeStart;
+  public void scheduleTotalExecTimeStart() {
+    scheduleTotalExecTimeStart = System.nanoTime();
+  }
+
+  public void scheduleTotalExecTimeEnd() {
+    if (scheduleTotalExecTimeStart != null) {
+      double endTimeMs = ((double) System.nanoTime() - 
scheduleTotalExecTimeStart) / 1.0E6D;
+      _scheduleTotalExecutionTime.observe(endTimeMs);
+    }
+  }
+
+  private Long scheduleStrategyExecTimeStart;
+  public void scheduleStrategyExecTimeStartStart() {
+    scheduleStrategyExecTimeStart = System.nanoTime();
+  }
+
+  public void scheduleStrategyExecTimeStartEnd() {
+    if (scheduleStrategyExecTimeStart != null) {
+      double endTimeMs = ((double) System.nanoTime() - 
scheduleStrategyExecTimeStart) / 1.0E6D;
+      _scheduleStrategyExecutionTime.observe(endTimeMs);
+    }
+  }
+
+  private Long rebalanceTotalExecTimeStart;
+  public void rebalanceTotalExecTimeStart() {
+    rebalanceTotalExecTimeStart = System.nanoTime();
+  }
+
+  public void rebalanceTotalExecTimeEnd() {
+    if (rebalanceTotalExecTimeStart != null) {
+      double endTimeMs = ((double) System.nanoTime() - 
rebalanceTotalExecTimeStart) / 1.0E6D;
+      _rebalanceTotalExecutionTime.observe(endTimeMs);
+    }
+  }
+
+  private Long rebalanceStrategyExecTimeStart;
+  public void rebalanceStrategyExecTimeStart() {
+    rebalanceStrategyExecTimeStart = System.nanoTime();
+  }
+
+  public void rebalanceStrategyExecTimeEnd() {
+    if (rebalanceStrategyExecTimeStart != null) {
+      double endTimeMs = ((double) System.nanoTime() - 
rebalanceStrategyExecTimeStart) / 1.0E6D;
+      _rebalanceStrategyExecutionTime.observe(endTimeMs);
+    }
+  }
+
+  private Long stopInstanceProcessTimeStart;
+  public void stopInstanceProcessTimeStart() {
+    stopInstanceProcessTimeStart = System.nanoTime();
+  }
+
+  public void stopInstanceProcessTimeEnd() {
+    if (stopInstanceProcessTimeStart != null) {
+      double endTimeMs = ((double) System.nanoTime() - 
stopInstanceProcessTimeStart) / 1.0E6D;
+      _stopInstanceProcessTime.observe(endTimeMs);
+    }
+  }
+
+  private Long startInstanceProcessTimeStart;
+  public void startInstanceProcessTimeStart() {
+    startInstanceProcessTimeStart = System.nanoTime();
+  }
+
+  public void startInstanceProcessTimeEnd() {
+    if (startInstanceProcessTimeStart != null) {
+      double endTimeMs = ((double) System.nanoTime() - 
startInstanceProcessTimeStart) / 1.0E6D;
+      _startInstanceProcessTime.observe(endTimeMs);
+    }
+  }
+
+  public String getStatsAsString() throws IOException {
+
+    _statNumInstances.set(functionRuntimeManager.getMyInstances());
+
+    StringWriter outputWriter = new StringWriter();
+
+    PrometheusTextFormat.write004(outputWriter, 
collectorRegistry.metricFamilySamples());
+
+    return outputWriter.toString();
+  }
+}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 56ee46c..fe8b810 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -19,21 +19,8 @@
 package org.apache.pulsar.functions.worker.rest;
 
 import com.google.common.annotations.VisibleForTesting;
-
-import java.net.BindException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.TimeZone;
-
-import javax.servlet.DispatcherType;
-
-import io.prometheus.client.exporter.MetricsServlet;
+import io.prometheus.client.jetty.JettyStatisticsCollector;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.common.util.SecurityUtility;
@@ -49,6 +36,7 @@ import 
org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.handler.RequestLogHandler;
+import org.eclipse.jetty.server.handler.StatisticsHandler;
 import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -56,6 +44,16 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
 
+import java.net.BindException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+
+import javax.servlet.DispatcherType;
+
 @Slf4j
 public class WorkerServer {
 
@@ -121,7 +119,17 @@ public class WorkerServer {
         contexts.setHandlers(handlers.toArray(new Handler[handlers.size()]));
         HandlerCollection handlerCollection = new HandlerCollection();
         handlerCollection.setHandlers(new Handler[] { contexts, new 
DefaultHandler(), requestLogHandler });
-        server.setHandler(handlerCollection);
+
+        // Metrics handler
+        StatisticsHandler stats = new StatisticsHandler();
+        stats.setHandler(handlerCollection);
+        try {
+            new JettyStatisticsCollector(stats).register();
+        } catch (IllegalArgumentException e) {
+            // Already registered. Eg: in unit tests
+        }
+        handlers.add(stats);
+        server.setHandler(stats);
 
         if (this.workerConfig.getWorkerPortTls() != null) {
             try {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index c7e8e41..a3b39ed 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -104,6 +104,7 @@ public class FunctionRuntimeManagerTest {
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class),
+                mock(WorkerStatsManager.class),
                 mock(ErrorNotifier.class)));
         FunctionActioner functionActioner = 
spy(functionRuntimeManager.getFunctionActioner());
         
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -188,6 +189,7 @@ public class FunctionRuntimeManagerTest {
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class),
+                mock(WorkerStatsManager.class),
                 mock(ErrorNotifier.class)));
         FunctionActioner functionActioner = 
spy(functionRuntimeManager.getFunctionActioner());
         
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -275,6 +277,7 @@ public class FunctionRuntimeManagerTest {
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class),
+                mock(WorkerStatsManager.class),
                 mock(ErrorNotifier.class));
         FunctionActioner functionActioner = 
spy(functionRuntimeManager.getFunctionActioner());
         
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -406,6 +409,7 @@ public class FunctionRuntimeManagerTest {
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class),
+                mock(WorkerStatsManager.class),
                 mock(ErrorNotifier.class));
         FunctionActioner functionActioner = 
spy(functionRuntimeManager.getFunctionActioner());
         
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -593,6 +597,7 @@ public class FunctionRuntimeManagerTest {
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class),
+                mock(WorkerStatsManager.class),
                 errorNotifier);
         FunctionActioner functionActioner = 
spy(functionRuntimeManager.getFunctionActioner());
         
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -665,6 +670,7 @@ public class FunctionRuntimeManagerTest {
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class),
+                mock(WorkerStatsManager.class),
                 mock(ErrorNotifier.class));
         functionRuntimeManager.setFunctionActioner(functionActioner);
 
@@ -770,6 +776,7 @@ public class FunctionRuntimeManagerTest {
                     mock(ConnectorsManager.class),
                     mock(FunctionsManager.class),
                     mock(FunctionMetaDataManager.class),
+                    mock(WorkerStatsManager.class),
                     mock(ErrorNotifier.class));
 
             fail();
@@ -795,6 +802,7 @@ public class FunctionRuntimeManagerTest {
                     mock(ConnectorsManager.class),
                     mock(FunctionsManager.class),
                     mock(FunctionMetaDataManager.class),
+                    mock(WorkerStatsManager.class),
                     mock(ErrorNotifier.class));
 
             fail();
@@ -820,6 +828,7 @@ public class FunctionRuntimeManagerTest {
                     mock(ConnectorsManager.class),
                     mock(FunctionsManager.class),
                     mock(FunctionMetaDataManager.class),
+                    mock(WorkerStatsManager.class),
                     mock(ErrorNotifier.class));
 
             fail();
@@ -845,6 +854,7 @@ public class FunctionRuntimeManagerTest {
                     mock(ConnectorsManager.class),
                     mock(FunctionsManager.class),
                     mock(FunctionMetaDataManager.class),
+                    mock(WorkerStatsManager.class),
                     mock(ErrorNotifier.class));
 
             
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), 
ThreadRuntimeFactory.class);
@@ -875,6 +885,7 @@ public class FunctionRuntimeManagerTest {
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class),
+                mock(WorkerStatsManager.class),
                 mock(ErrorNotifier.class));
 
         assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), 
KubernetesRuntimeFactory.class);
@@ -904,6 +915,7 @@ public class FunctionRuntimeManagerTest {
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class),
+                mock(WorkerStatsManager.class),
                 mock(ErrorNotifier.class));
 
         assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), 
ProcessRuntimeFactory.class);
@@ -929,6 +941,7 @@ public class FunctionRuntimeManagerTest {
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class),
+                mock(WorkerStatsManager.class),
                 mock(ErrorNotifier.class));
 
         assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), 
ThreadRuntimeFactory.class);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 71fe6b2..50012be 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -113,6 +113,7 @@ public class MembershipManagerTest {
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
                 functionMetaDataManager,
+                mock(WorkerStatsManager.class),
                 mock(ErrorNotifier.class)));
         MembershipManager membershipManager = spy(new 
MembershipManager(workerService, pulsarClient, pulsarAdmin));
 
@@ -187,6 +188,7 @@ public class MembershipManagerTest {
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
                 functionMetaDataManager,
+                mock(WorkerStatsManager.class),
                 mock(ErrorNotifier.class)));
 
         MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
@@ -276,6 +278,7 @@ public class MembershipManagerTest {
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
                 functionMetaDataManager,
+                mock(WorkerStatsManager.class),
                 mock(ErrorNotifier.class)));
         MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
 
@@ -357,6 +360,7 @@ public class MembershipManagerTest {
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
                 functionMetaDataManager,
+                mock(WorkerStatsManager.class),
                 mock(ErrorNotifier.class)));
         MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index ab59cfd..66174fb 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -127,7 +127,8 @@ public class SchedulerManagerTest {
         this.executor = Executors
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("worker-test"));
         errorNotifier = spy(ErrorNotifier.getDefaultImpl());
-        schedulerManager = spy(new SchedulerManager(workerConfig, 
pulsarClient, null, errorNotifier));
+        schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient,
+          null, mock(WorkerStatsManager.class), errorNotifier));
         functionRuntimeManager = mock(FunctionRuntimeManager.class);
         functionMetaDataManager = mock(FunctionMetaDataManager.class);
         membershipManager = mock(MembershipManager.class);

Reply via email to