This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 10c3511  Add worker specific system and jvm metrics (#2352)
10c3511 is described below

commit 10c351134a36457d3734a356b798b9f18b8d8593
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Fri Aug 10 00:21:55 2018 -0700

    Add worker specific system and jvm metrics (#2352)
---
 .../apache/pulsar/broker/admin/v2/WorkerStats.java | 11 ++++++-
 .../broker/loadbalance/impl/LoadManagerShared.java |  4 +--
 .../pulsar/broker/stats/MetricsGenerator.java      |  4 +--
 .../prometheus/PrometheusMetricsGenerator.java     |  4 +--
 .../stats/BookieClientsStatsGeneratorTest.java     |  2 +-
 .../apache/pulsar/client/admin/WorkerStats.java    |  9 ++++++
 .../client/admin/internal/WorkerStatsImpl.java     | 18 ++++++++++-
 .../pulsar/admin/cli/CmdFunctionWorkerStats.java   | 24 +++++++++++---
 .../apache/pulsar/common/stats}/JvmMetrics.java    | 32 +++++++++++++------
 .../org/apache/pulsar/common/stats/Metrics.java    |  5 +++
 .../pulsar/functions/worker/MetricsGenerator.java  | 37 ++++++++++++++--------
 .../pulsar/functions/worker/WorkerService.java     |  3 ++
 .../functions/worker/rest/api/FunctionsImpl.java   | 21 +++++++++++-
 .../functions/worker/rest/api/v2/WorkerStats.java  | 17 +++++++---
 14 files changed, 150 insertions(+), 41 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
index 962c483..5a0e4b7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.admin.v2;
 
 import java.io.IOException;
+import java.util.Collection;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -41,7 +42,15 @@ public class WorkerStats extends FunctionApiResource {
     @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = Metrics.class)
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 503, message = "Worker service is not 
running") })
-    public Response getMetrics() throws IOException {
+    public Response getStats() throws IOException {
         return functions.getFunctionsMetrcis(clientAppId());
     }
+    
+    @GET
+    @Path("/metrics")
+    @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request 
should be executed by Monitoring agent on each worker to fetch the 
worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, 
responseContainer = "List")
+    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission") })
+    public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() 
throws Exception {
+        return functions.getWorkerMetrcis(clientAppId());
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 80cb31c..6a5575c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -41,7 +41,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.broker.loadbalance.LoadData;
-import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
+import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -230,7 +230,7 @@ public class LoadManagerShared {
         systemResourceUsage.memory.limit = (double) maxHeapMemoryInBytes / 
MIBI;
 
         // Collect JVM direct memory
-        systemResourceUsage.directMemory.usage = (double) 
(JvmMetrics.getJvmDirectMemoryUsed() / MIBI);
+        systemResourceUsage.directMemory.usage = (double) 
(getJvmDirectMemoryUsed() / MIBI);
         systemResourceUsage.directMemory.limit = (double) 
(PlatformDependent.maxDirectMemory() / MIBI);
 
         return systemResourceUsage;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java
index 9472a7d..3aaa955 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java
@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
+import org.apache.pulsar.common.stats.JvmMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
 import org.apache.pulsar.common.stats.Metrics;
@@ -38,7 +38,7 @@ public class MetricsGenerator {
 
     public MetricsGenerator(PulsarService pulsar) {
         this.pulsar = pulsar;
-        this.jvmMetrics = new JvmMetrics(pulsar);
+        this.jvmMetrics = new JvmMetrics(pulsar.getExecutor(), "brk");
     }
 
     public Collection<Metrics> generate() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 8f0d30d..5c42b51 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -23,7 +23,7 @@ import java.io.OutputStream;
 import java.util.Enumeration;
 
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
+import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 import io.netty.buffer.ByteBuf;
@@ -51,7 +51,7 @@ public class PrometheusMetricsGenerator {
         Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new 
Child() {
             @Override
             public double get() {
-                return JvmMetrics.getJvmDirectMemoryUsed();
+                return getJvmDirectMemoryUsed();
             }
         }).register(CollectorRegistry.defaultRegistry);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
index 251ed57..332bc07 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.stats.BookieClientStatsGenerator;
-import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
+import org.apache.pulsar.common.stats.JvmMetrics;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
index 069e58f..4fc242f 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.admin;
 
+import java.util.Collection;
+
 import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
 
 /**
@@ -32,4 +34,11 @@ public interface WorkerStats {
      * @throws PulsarAdminException 
      */
     Metrics getFunctionsStats() throws PulsarAdminException;
+    
+    /**
+     * Get worker metrics.
+     * @return
+     * @throws PulsarAdminException
+     */
+    Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws 
PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
index 5b6762c..f492d31 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
@@ -18,15 +18,20 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
+import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;
+
+import java.util.Collection;
+import java.util.List;
+
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.Response;
 
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.WorkerStats;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -55,4 +60,15 @@ public class WorkerStatsImpl extends BaseResource implements 
WorkerStats {
            throw getApiException(e);
        }
    }
+
+    @Override
+    public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() 
throws PulsarAdminException {
+        try {
+            return request(workerStats.path("metrics"))
+                    .get(new 
GenericType<List<org.apache.pulsar.common.stats.Metrics>>() {
+                    });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
index 640d7d7..6bd22ae 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.functions.utils.Utils;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
+import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonParser;
 
@@ -34,8 +35,6 @@ import lombok.extern.slf4j.Slf4j;
 @Parameters(commandDescription = "Operations to collect function-worker 
statistics")
 public class CmdFunctionWorkerStats extends CmdBase {
 
-    private final FunctionsStats functionsStats;
-
     /**
      * Base command
      */
@@ -70,10 +69,27 @@ public class CmdFunctionWorkerStats extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "dump metrics for Monitoring")
+    class CmdMonitoringMetrics extends BaseCommand {
+
+        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
+        boolean indent = false;
+
+        @Override
+        void runCmd() throws Exception {
+            String json = new Gson().toJson(admin.workerStats().getMetrics());
+            GsonBuilder gsonBuilder = new GsonBuilder();
+            if (indent) {
+                gsonBuilder.setPrettyPrinting();
+            }
+            System.out.println(gsonBuilder.create().toJson(new 
JsonParser().parse(json)));
+        }
+    }
+
     public CmdFunctionWorkerStats(PulsarAdmin admin) throws 
PulsarClientException {
         super("functions", admin);
-        functionsStats = new FunctionsStats();
-        jcommander.addCommand("functions", functionsStats);
+        jcommander.addCommand("functions", new FunctionsStats());
+        jcommander.addCommand("monitoring-metrics", new 
CmdMonitoringMetrics());
     }
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/JvmMetrics.java
 b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
similarity index 87%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/JvmMetrics.java
rename to 
pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
index d6870a5..6353dfa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/JvmMetrics.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
@@ -16,18 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.stats.metrics;
+package org.apache.pulsar.common.stats;
 
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Field;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
-import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.stats.Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +43,7 @@ import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.util.internal.PlatformDependent;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class JvmMetrics extends AbstractMetrics {
+public class JvmMetrics {
 
     private volatile long accumulatedYoungGcCount = 0;
     private volatile long currentYoungGcCount = 0;
@@ -55,6 +57,8 @@ public class JvmMetrics extends AbstractMetrics {
     
     private static final Logger log = 
LoggerFactory.getLogger(JvmMetrics.class);
     private static Field directMemoryUsage = null;
+    
+    private final String componentName;
     static {
         try {
             directMemoryUsage = 
PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER");
@@ -64,13 +68,14 @@ public class JvmMetrics extends AbstractMetrics {
         }
     }
 
-    public JvmMetrics(PulsarService pulsar) {
-        super(pulsar);
-        pulsar.getExecutor().scheduleAtFixedRate(this::updateGcStats, 0, 1, 
TimeUnit.MINUTES);
+    public JvmMetrics(ScheduledExecutorService executor, String componentName) 
{
+        if (executor != null) {
+            executor.scheduleAtFixedRate(this::updateGcStats, 0, 1, 
TimeUnit.MINUTES);
+        }
+        this.componentName = componentName;
     }
 
     @SuppressWarnings("restriction")
-    @Override
     public List<Metrics> generate() {
 
         Metrics m = createMetrics();
@@ -105,8 +110,8 @@ public class JvmMetrics extends AbstractMetrics {
             }
         }
 
-        m.put("brk_default_pool_allocated", totalAllocated);
-        m.put("brk_default_pool_used", totalUsed);
+        m.put(this.componentName + "_default_pool_allocated", totalAllocated);
+        m.put(this.componentName + "_default_pool_used", totalUsed);
 
         return Lists.newArrayList(m);
     }
@@ -173,5 +178,14 @@ public class JvmMetrics extends AbstractMetrics {
 
         return parentThreadGroup.activeCount();
     }
+    
+    private Metrics createMetrics() {
+        return createMetrics(Collections.singletonMap("metric", 
"jvm_metrics"));
+    }
+
+    private Metrics createMetrics(Map<String, String> dimensionMap) {
+        // create with current version
+        return Metrics.create(dimensionMap);
+    }
 
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java
index 0281da7..045acc1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java
@@ -43,6 +43,11 @@ public class Metrics {
     @JsonInclude(content=Include.NON_EMPTY)
     final Map<String, String> dimensions;
 
+    public Metrics() {
+        metrics = Maps.newTreeMap();
+        dimensions = Maps.newHashMap();
+    }
+    
     // hide constructor
     protected Metrics(Map<String, String> unmodifiableDimensionMap) {
         this.metrics = Maps.newTreeMap();
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MetricsGenerator.java
similarity index 53%
copy from 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
copy to 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MetricsGenerator.java
index 069e58f..59e71d4 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MetricsGenerator.java
@@ -16,20 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.admin;
+package org.apache.pulsar.functions.worker;
 
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.pulsar.common.stats.JvmMetrics;
+import org.apache.pulsar.common.stats.Metrics;
+
+public class MetricsGenerator {
+
+    private final JvmMetrics jvmMetrics;
+
+    public MetricsGenerator(ScheduledExecutorService executor) {
+        this.jvmMetrics = new JvmMetrics(executor, "fun");
+    }
+
+    public List<Metrics> generate() {
+        List<Metrics> metricsCollection = new ArrayList<Metrics>();
+        metricsCollection.addAll(jvmMetrics.generate());
+        // add more metrics here..
+
+        return metricsCollection;
+    }
 
-/**
- * Admin interface for worker stats management.
- */
-public interface WorkerStats {
-    
-    
-    /**
-     * Get all functions stats on a worker
-     * @return
-     * @throws PulsarAdminException 
-     */
-    Metrics getFunctionsStats() throws PulsarAdminException;
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 6af9c8f..e4cf2cc 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.stats.JvmMetrics;
 
 /**
  * A service component contains everything to run a worker except rest server.
@@ -63,11 +64,13 @@ public class WorkerService {
     private AuthenticationService authenticationService;
     private ConnectorsManager connectorsManager;
     private PulsarAdmin admin;
+    private final MetricsGenerator metricsGenerator;
 
     public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
         this.statsUpdater = Executors
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("worker-stats-updater"));
+        this.metricsGenerator = new MetricsGenerator(this.statsUpdater);
     }
 
     public void start(URI dlogUri) throws InterruptedException {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index c35dd52..b71e6cd 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -60,6 +60,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.join;
+
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -976,7 +977,25 @@ public class FunctionsImpl {
     public boolean isSuperUser(String clientRole) {
         return clientRole != null && 
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
     }
-    
+
+    public List<org.apache.pulsar.common.stats.Metrics> 
getWorkerMetrcis(String clientRole) throws IOException {
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
+            log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
+            throw new 
WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(clientRole + " is not authorize to 
get metrics")).build());
+        }
+        return getWorkerMetrcis();
+    }
+
+    private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis() {
+        if (!isWorkerServiceAvailable()) {
+            throw new WebApplicationException(
+                    
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
+                            .entity(new ErrorData("Function worker service is 
not avaialable")).build());
+        }
+        return worker().getMetricsGenerator().generate();
+    }
+
     public Response getFunctionsMetrcis(String clientRole) throws IOException {
         if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
             log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
index 6e4ae55..146bb21 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
 import java.io.IOException;
+import java.util.Collection;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
@@ -27,6 +28,7 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
 
 import io.swagger.annotations.Api;
@@ -34,7 +36,6 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
 
 @Slf4j
 @Path("/worker-stats")
@@ -45,10 +46,18 @@ public class WorkerStats extends FunctionApiResource {
 
     @GET
     @Path("/functions")
-    @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = Metrics.class)
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+    @ApiOperation(value = "Get stats for all functions owned by worker", notes 
= "Request should be executed by Monitoring agent on each worker to fetch the 
function-metrics", response = Metrics.class)
+    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission"),
             @ApiResponse(code = 503, message = "Worker service is not 
running") })
-    public Response getMetrics() throws IOException {
+    public Response getStats() throws IOException {
         return functions.getFunctionsMetrcis(clientAppId());
     }
+    
+    @GET
+    @Path("/metrics")
+    @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request 
should be executed by Monitoring agent on each worker to fetch the 
worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, 
responseContainer = "List")
+    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission") })
+    public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() 
throws Exception {
+        return functions.getWorkerMetrcis(clientAppId());
+    }
 }

Reply via email to