rdhabalia closed pull request #2352: Add worker specific system and jvm metrics
URL: https://github.com/apache/incubator-pulsar/pull/2352
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
index 962c4835d6..5a0e4b7b61 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.admin.v2;
 
 import java.io.IOException;
+import java.util.Collection;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -41,7 +42,15 @@
     @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = Metrics.class)
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 503, message = "Worker service is not 
running") })
-    public Response getMetrics() throws IOException {
+    public Response getStats() throws IOException {
         return functions.getFunctionsMetrcis(clientAppId());
     }
+    
+    @GET
+    @Path("/metrics")
+    @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request 
should be executed by Monitoring agent on each worker to fetch the 
worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, 
responseContainer = "List")
+    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission") })
+    public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() 
throws Exception {
+        return functions.getWorkerMetrcis(clientAppId());
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 80cb31cb10..6a5575c02d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -41,7 +41,7 @@
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.broker.loadbalance.LoadData;
-import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
+import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -230,7 +230,7 @@ public static SystemResourceUsage 
getSystemResourceUsage(final BrokerHostUsage b
         systemResourceUsage.memory.limit = (double) maxHeapMemoryInBytes / 
MIBI;
 
         // Collect JVM direct memory
-        systemResourceUsage.directMemory.usage = (double) 
(JvmMetrics.getJvmDirectMemoryUsed() / MIBI);
+        systemResourceUsage.directMemory.usage = (double) 
(getJvmDirectMemoryUsed() / MIBI);
         systemResourceUsage.directMemory.limit = (double) 
(PlatformDependent.maxDirectMemory() / MIBI);
 
         return systemResourceUsage;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java
index 9472a7dc61..3aaa955463 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java
@@ -24,7 +24,7 @@
 import java.util.Map;
 
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
+import org.apache.pulsar.common.stats.JvmMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
 import org.apache.pulsar.common.stats.Metrics;
@@ -38,7 +38,7 @@
 
     public MetricsGenerator(PulsarService pulsar) {
         this.pulsar = pulsar;
-        this.jvmMetrics = new JvmMetrics(pulsar);
+        this.jvmMetrics = new JvmMetrics(pulsar.getExecutor(), "brk");
     }
 
     public Collection<Metrics> generate() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 8f0d30d56a..5c42b51328 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -23,7 +23,7 @@
 import java.util.Enumeration;
 
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
+import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 import io.netty.buffer.ByteBuf;
@@ -51,7 +51,7 @@
         Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new 
Child() {
             @Override
             public double get() {
-                return JvmMetrics.getJvmDirectMemoryUsed();
+                return getJvmDirectMemoryUsed();
             }
         }).register(CollectorRegistry.defaultRegistry);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
index 251ed57940..332bc07889 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
@@ -25,7 +25,7 @@
 import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.stats.BookieClientStatsGenerator;
-import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
+import org.apache.pulsar.common.stats.JvmMetrics;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
index 069e58ff6e..4fc242f1ed 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.admin;
 
+import java.util.Collection;
+
 import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
 
 /**
@@ -32,4 +34,11 @@
      * @throws PulsarAdminException 
      */
     Metrics getFunctionsStats() throws PulsarAdminException;
+    
+    /**
+     * Get worker metrics.
+     * @return
+     * @throws PulsarAdminException
+     */
+    Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws 
PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
index 5b6762c5a1..f492d31125 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
@@ -18,15 +18,20 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
+import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;
+
+import java.util.Collection;
+import java.util.List;
+
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.Response;
 
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.WorkerStats;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -55,4 +60,15 @@ public Metrics getFunctionsStats() throws 
PulsarAdminException {
            throw getApiException(e);
        }
    }
+
+    @Override
+    public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() 
throws PulsarAdminException {
+        try {
+            return request(workerStats.path("metrics"))
+                    .get(new 
GenericType<List<org.apache.pulsar.common.stats.Metrics>>() {
+                    });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
index 640d7d7c1c..6bd22ae2c7 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
@@ -24,6 +24,7 @@
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
+import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonParser;
 
@@ -34,8 +35,6 @@
 @Parameters(commandDescription = "Operations to collect function-worker 
statistics")
 public class CmdFunctionWorkerStats extends CmdBase {
 
-    private final FunctionsStats functionsStats;
-
     /**
      * Base command
      */
@@ -70,10 +69,27 @@ void runCmd() throws Exception {
         }
     }
 
+    @Parameters(commandDescription = "dump metrics for Monitoring")
+    class CmdMonitoringMetrics extends BaseCommand {
+
+        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
+        boolean indent = false;
+
+        @Override
+        void runCmd() throws Exception {
+            String json = new Gson().toJson(admin.workerStats().getMetrics());
+            GsonBuilder gsonBuilder = new GsonBuilder();
+            if (indent) {
+                gsonBuilder.setPrettyPrinting();
+            }
+            System.out.println(gsonBuilder.create().toJson(new 
JsonParser().parse(json)));
+        }
+    }
+
     public CmdFunctionWorkerStats(PulsarAdmin admin) throws 
PulsarClientException {
         super("functions", admin);
-        functionsStats = new FunctionsStats();
-        jcommander.addCommand("functions", functionsStats);
+        jcommander.addCommand("functions", new FunctionsStats());
+        jcommander.addCommand("monitoring-metrics", new 
CmdMonitoringMetrics());
     }
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/JvmMetrics.java
 b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
similarity index 87%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/JvmMetrics.java
rename to 
pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
index d6870a5151..6353dfa7fe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/JvmMetrics.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
@@ -16,18 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.stats.metrics;
+package org.apache.pulsar.common.stats;
 
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Field;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
-import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.stats.Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +43,7 @@
 import io.netty.util.internal.PlatformDependent;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class JvmMetrics extends AbstractMetrics {
+public class JvmMetrics {
 
     private volatile long accumulatedYoungGcCount = 0;
     private volatile long currentYoungGcCount = 0;
@@ -55,6 +57,8 @@
     
     private static final Logger log = 
LoggerFactory.getLogger(JvmMetrics.class);
     private static Field directMemoryUsage = null;
+    
+    private final String componentName;
     static {
         try {
             directMemoryUsage = 
PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER");
@@ -64,13 +68,14 @@
         }
     }
 
-    public JvmMetrics(PulsarService pulsar) {
-        super(pulsar);
-        pulsar.getExecutor().scheduleAtFixedRate(this::updateGcStats, 0, 1, 
TimeUnit.MINUTES);
+    public JvmMetrics(ScheduledExecutorService executor, String componentName) 
{
+        if (executor != null) {
+            executor.scheduleAtFixedRate(this::updateGcStats, 0, 1, 
TimeUnit.MINUTES);
+        }
+        this.componentName = componentName;
     }
 
     @SuppressWarnings("restriction")
-    @Override
     public List<Metrics> generate() {
 
         Metrics m = createMetrics();
@@ -105,8 +110,8 @@ public JvmMetrics(PulsarService pulsar) {
             }
         }
 
-        m.put("brk_default_pool_allocated", totalAllocated);
-        m.put("brk_default_pool_used", totalUsed);
+        m.put(this.componentName + "_default_pool_allocated", totalAllocated);
+        m.put(this.componentName + "_default_pool_used", totalUsed);
 
         return Lists.newArrayList(m);
     }
@@ -173,5 +178,14 @@ private long getThreadCount() {
 
         return parentThreadGroup.activeCount();
     }
+    
+    private Metrics createMetrics() {
+        return createMetrics(Collections.singletonMap("metric", 
"jvm_metrics"));
+    }
+
+    private Metrics createMetrics(Map<String, String> dimensionMap) {
+        // create with current version
+        return Metrics.create(dimensionMap);
+    }
 
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java
index 0281da7774..045acc142a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java
@@ -43,6 +43,11 @@
     @JsonInclude(content=Include.NON_EMPTY)
     final Map<String, String> dimensions;
 
+    public Metrics() {
+        metrics = Maps.newTreeMap();
+        dimensions = Maps.newHashMap();
+    }
+    
     // hide constructor
     protected Metrics(Map<String, String> unmodifiableDimensionMap) {
         this.metrics = Maps.newTreeMap();
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MetricsGenerator.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MetricsGenerator.java
new file mode 100644
index 0000000000..59e71d49b8
--- /dev/null
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MetricsGenerator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.pulsar.common.stats.JvmMetrics;
+import org.apache.pulsar.common.stats.Metrics;
+
+public class MetricsGenerator {
+
+    private final JvmMetrics jvmMetrics;
+
+    public MetricsGenerator(ScheduledExecutorService executor) {
+        this.jvmMetrics = new JvmMetrics(executor, "fun");
+    }
+
+    public List<Metrics> generate() {
+        List<Metrics> metricsCollection = new ArrayList<Metrics>();
+        metricsCollection.addAll(jvmMetrics.generate());
+        // add more metrics here..
+
+        return metricsCollection;
+    }
+
+}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 6af9c8f2bf..e4cf2cc99d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -41,6 +41,7 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.stats.JvmMetrics;
 
 /**
  * A service component contains everything to run a worker except rest server.
@@ -63,11 +64,13 @@
     private AuthenticationService authenticationService;
     private ConnectorsManager connectorsManager;
     private PulsarAdmin admin;
+    private final MetricsGenerator metricsGenerator;
 
     public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
         this.statsUpdater = Executors
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("worker-stats-updater"));
+        this.metricsGenerator = new MetricsGenerator(this.statsUpdater);
     }
 
     public void start(URI dlogUri) throws InterruptedException {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index c35dd52e80..b71e6cd327 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -60,6 +60,7 @@
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.join;
+
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -976,7 +977,25 @@ public boolean isAuthorizedRole(String tenant, String 
clientRole) throws PulsarA
     public boolean isSuperUser(String clientRole) {
         return clientRole != null && 
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
     }
-    
+
+    public List<org.apache.pulsar.common.stats.Metrics> 
getWorkerMetrcis(String clientRole) throws IOException {
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
+            log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
+            throw new 
WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(clientRole + " is not authorize to 
get metrics")).build());
+        }
+        return getWorkerMetrcis();
+    }
+
+    private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis() {
+        if (!isWorkerServiceAvailable()) {
+            throw new WebApplicationException(
+                    
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
+                            .entity(new ErrorData("Function worker service is 
not avaialable")).build());
+        }
+        return worker().getMetricsGenerator().generate();
+    }
+
     public Response getFunctionsMetrcis(String clientRole) throws IOException {
         if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
             log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
index 6e4ae5533d..146bb21694 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
 import java.io.IOException;
+import java.util.Collection;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
@@ -27,6 +28,7 @@
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
 
 import io.swagger.annotations.Api;
@@ -34,7 +36,6 @@
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
 
 @Slf4j
 @Path("/worker-stats")
@@ -45,10 +46,18 @@
 
     @GET
     @Path("/functions")
-    @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = Metrics.class)
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+    @ApiOperation(value = "Get stats for all functions owned by worker", notes 
= "Request should be executed by Monitoring agent on each worker to fetch the 
function-metrics", response = Metrics.class)
+    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission"),
             @ApiResponse(code = 503, message = "Worker service is not 
running") })
-    public Response getMetrics() throws IOException {
+    public Response getStats() throws IOException {
         return functions.getFunctionsMetrcis(clientAppId());
     }
+    
+    @GET
+    @Path("/metrics")
+    @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request 
should be executed by Monitoring agent on each worker to fetch the 
worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, 
responseContainer = "List")
+    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission") })
+    public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() 
throws Exception {
+        return functions.getWorkerMetrcis(clientAppId());
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to