This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 10c3511 Add worker specific system and jvm metrics (#2352) 10c3511 is described below commit 10c351134a36457d3734a356b798b9f18b8d8593 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Fri Aug 10 00:21:55 2018 -0700 Add worker specific system and jvm metrics (#2352) --- .../apache/pulsar/broker/admin/v2/WorkerStats.java | 11 ++++++- .../broker/loadbalance/impl/LoadManagerShared.java | 4 +-- .../pulsar/broker/stats/MetricsGenerator.java | 4 +-- .../prometheus/PrometheusMetricsGenerator.java | 4 +-- .../stats/BookieClientsStatsGeneratorTest.java | 2 +- .../apache/pulsar/client/admin/WorkerStats.java | 9 ++++++ .../client/admin/internal/WorkerStatsImpl.java | 18 ++++++++++- .../pulsar/admin/cli/CmdFunctionWorkerStats.java | 24 +++++++++++--- .../apache/pulsar/common/stats}/JvmMetrics.java | 32 +++++++++++++------ .../org/apache/pulsar/common/stats/Metrics.java | 5 +++ .../pulsar/functions/worker/MetricsGenerator.java | 37 ++++++++++++++-------- .../pulsar/functions/worker/WorkerService.java | 3 ++ .../functions/worker/rest/api/FunctionsImpl.java | 21 +++++++++++- .../functions/worker/rest/api/v2/WorkerStats.java | 17 +++++++--- 14 files changed, 150 insertions(+), 41 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java index 962c483..5a0e4b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.admin.v2; import java.io.IOException; +import java.util.Collection; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -41,7 +42,15 @@ public class WorkerStats extends FunctionApiResource { @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class) @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 503, message = "Worker service is not running") }) - public Response getMetrics() throws IOException { + public Response getStats() throws IOException { return functions.getFunctionsMetrcis(clientAppId()); } + + @GET + @Path("/metrics") + @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List") + @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") }) + public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception { + return functions.getWorkerMetrcis(clientAppId()); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java index 80cb31c..6a5575c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java @@ -41,7 +41,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.BrokerHostUsage; import org.apache.pulsar.broker.loadbalance.LoadData; -import org.apache.pulsar.broker.stats.metrics.JvmMetrics; +import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; @@ -230,7 +230,7 @@ public class LoadManagerShared { systemResourceUsage.memory.limit = (double) maxHeapMemoryInBytes / MIBI; // Collect JVM direct memory - systemResourceUsage.directMemory.usage = (double) (JvmMetrics.getJvmDirectMemoryUsed() / MIBI); + systemResourceUsage.directMemory.usage = (double) (getJvmDirectMemoryUsed() / MIBI); systemResourceUsage.directMemory.limit = (double) (PlatformDependent.maxDirectMemory() / MIBI); return systemResourceUsage; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java index 9472a7d..3aaa955 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java @@ -24,7 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.stats.metrics.JvmMetrics; +import org.apache.pulsar.common.stats.JvmMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics; import org.apache.pulsar.common.stats.Metrics; @@ -38,7 +38,7 @@ public class MetricsGenerator { public MetricsGenerator(PulsarService pulsar) { this.pulsar = pulsar; - this.jvmMetrics = new JvmMetrics(pulsar); + this.jvmMetrics = new JvmMetrics(pulsar.getExecutor(), "brk"); } public Collection<Metrics> generate() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 8f0d30d..5c42b51 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -23,7 +23,7 @@ import java.io.OutputStream; import java.util.Enumeration; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.stats.metrics.JvmMetrics; +import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed; import org.apache.pulsar.common.util.SimpleTextOutputStream; import io.netty.buffer.ByteBuf; @@ -51,7 +51,7 @@ public class PrometheusMetricsGenerator { Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() { @Override public double get() { - return JvmMetrics.getJvmDirectMemoryUsed(); + return getJvmDirectMemoryUsed(); } }).register(CollectorRegistry.defaultRegistry); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java index 251ed57..332bc07 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java @@ -25,7 +25,7 @@ import java.util.Map; import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.stats.BookieClientStatsGenerator; -import org.apache.pulsar.broker.stats.metrics.JvmMetrics; +import org.apache.pulsar.common.stats.JvmMetrics; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java index 069e58f..4fc242f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.admin; +import java.util.Collection; + import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; /** @@ -32,4 +34,11 @@ public interface WorkerStats { * @throws PulsarAdminException */ Metrics getFunctionsStats() throws PulsarAdminException; + + /** + * Get worker metrics. + * @return + * @throws PulsarAdminException + */ + Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java index 5b6762c..f492d31 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java @@ -18,15 +18,20 @@ */ package org.apache.pulsar.client.admin.internal; +import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson; + +import java.util.Collection; +import java.util.List; + import javax.ws.rs.ClientErrorException; import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.GenericType; import javax.ws.rs.core.Response; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.WorkerStats; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; -import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson; import lombok.extern.slf4j.Slf4j; @@ -55,4 +60,15 @@ public class WorkerStatsImpl extends BaseResource implements WorkerStats { throw getApiException(e); } } + + @Override + public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws PulsarAdminException { + try { + return request(workerStats.path("metrics")) + .get(new GenericType<List<org.apache.pulsar.common.stats.Metrics>>() { + }); + } catch (Exception e) { + throw getApiException(e); + } + } } \ No newline at end of file diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java index 640d7d7..6bd22ae 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java @@ -24,6 +24,7 @@ import org.apache.pulsar.functions.utils.Utils; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; +import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonParser; @@ -34,8 +35,6 @@ import lombok.extern.slf4j.Slf4j; @Parameters(commandDescription = "Operations to collect function-worker statistics") public class CmdFunctionWorkerStats extends CmdBase { - private final FunctionsStats functionsStats; - /** * Base command */ @@ -70,10 +69,27 @@ public class CmdFunctionWorkerStats extends CmdBase { } } + @Parameters(commandDescription = "dump metrics for Monitoring") + class CmdMonitoringMetrics extends BaseCommand { + + @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false) + boolean indent = false; + + @Override + void runCmd() throws Exception { + String json = new Gson().toJson(admin.workerStats().getMetrics()); + GsonBuilder gsonBuilder = new GsonBuilder(); + if (indent) { + gsonBuilder.setPrettyPrinting(); + } + System.out.println(gsonBuilder.create().toJson(new JsonParser().parse(json))); + } + } + public CmdFunctionWorkerStats(PulsarAdmin admin) throws PulsarClientException { super("functions", admin); - functionsStats = new FunctionsStats(); - jcommander.addCommand("functions", functionsStats); + jcommander.addCommand("functions", new FunctionsStats()); + jcommander.addCommand("monitoring-metrics", new CmdMonitoringMetrics()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/JvmMetrics.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java similarity index 87% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/JvmMetrics.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java index d6870a5..6353dfa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/JvmMetrics.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java @@ -16,18 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.stats.metrics; +package org.apache.pulsar.common.stats; import java.lang.management.ManagementFactory; import java.lang.reflect.Field; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; -import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.common.stats.Metrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +43,7 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.util.internal.PlatformDependent; import java.util.concurrent.atomic.AtomicLong; -public class JvmMetrics extends AbstractMetrics { +public class JvmMetrics { private volatile long accumulatedYoungGcCount = 0; private volatile long currentYoungGcCount = 0; @@ -55,6 +57,8 @@ public class JvmMetrics extends AbstractMetrics { private static final Logger log = LoggerFactory.getLogger(JvmMetrics.class); private static Field directMemoryUsage = null; + + private final String componentName; static { try { directMemoryUsage = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER"); @@ -64,13 +68,14 @@ public class JvmMetrics extends AbstractMetrics { } } - public JvmMetrics(PulsarService pulsar) { - super(pulsar); - pulsar.getExecutor().scheduleAtFixedRate(this::updateGcStats, 0, 1, TimeUnit.MINUTES); + public JvmMetrics(ScheduledExecutorService executor, String componentName) { + if (executor != null) { + executor.scheduleAtFixedRate(this::updateGcStats, 0, 1, TimeUnit.MINUTES); + } + this.componentName = componentName; } @SuppressWarnings("restriction") - @Override public List<Metrics> generate() { Metrics m = createMetrics(); @@ -105,8 +110,8 @@ public class JvmMetrics extends AbstractMetrics { } } - m.put("brk_default_pool_allocated", totalAllocated); - m.put("brk_default_pool_used", totalUsed); + m.put(this.componentName + "_default_pool_allocated", totalAllocated); + m.put(this.componentName + "_default_pool_used", totalUsed); return Lists.newArrayList(m); } @@ -173,5 +178,14 @@ public class JvmMetrics extends AbstractMetrics { return parentThreadGroup.activeCount(); } + + private Metrics createMetrics() { + return createMetrics(Collections.singletonMap("metric", "jvm_metrics")); + } + + private Metrics createMetrics(Map<String, String> dimensionMap) { + // create with current version + return Metrics.create(dimensionMap); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java index 0281da7..045acc1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java @@ -43,6 +43,11 @@ public class Metrics { @JsonInclude(content=Include.NON_EMPTY) final Map<String, String> dimensions; + public Metrics() { + metrics = Maps.newTreeMap(); + dimensions = Maps.newHashMap(); + } + // hide constructor protected Metrics(Map<String, String> unmodifiableDimensionMap) { this.metrics = Maps.newTreeMap(); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MetricsGenerator.java similarity index 53% copy from pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java copy to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MetricsGenerator.java index 069e58f..59e71d4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MetricsGenerator.java @@ -16,20 +16,29 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.admin; +package org.apache.pulsar.functions.worker; -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.pulsar.common.stats.JvmMetrics; +import org.apache.pulsar.common.stats.Metrics; + +public class MetricsGenerator { + + private final JvmMetrics jvmMetrics; + + public MetricsGenerator(ScheduledExecutorService executor) { + this.jvmMetrics = new JvmMetrics(executor, "fun"); + } + + public List<Metrics> generate() { + List<Metrics> metricsCollection = new ArrayList<Metrics>(); + metricsCollection.addAll(jvmMetrics.generate()); + // add more metrics here.. + + return metricsCollection; + } -/** - * Admin interface for worker stats management. - */ -public interface WorkerStats { - - - /** - * Get all functions stats on a worker - * @return - * @throws PulsarAdminException - */ - Metrics getFunctionsStats() throws PulsarAdminException; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 6af9c8f..e4cf2cc 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.stats.JvmMetrics; /** * A service component contains everything to run a worker except rest server. @@ -63,11 +64,13 @@ public class WorkerService { private AuthenticationService authenticationService; private ConnectorsManager connectorsManager; private PulsarAdmin admin; + private final MetricsGenerator metricsGenerator; public WorkerService(WorkerConfig workerConfig) { this.workerConfig = workerConfig; this.statsUpdater = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater")); + this.metricsGenerator = new MetricsGenerator(this.statsUpdater); } public void start(URI dlogUri) throws InterruptedException { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index c35dd52..b71e6cd 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -60,6 +60,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.join; + import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -976,7 +977,25 @@ public class FunctionsImpl { public boolean isSuperUser(String clientRole) { return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole); } - + + public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis(String clientRole) throws IOException { + if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) { + log.error("Client [{}] is not admin and authorized to get function-stats", clientRole); + throw new WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(clientRole + " is not authorize to get metrics")).build()); + } + return getWorkerMetrcis(); + } + + private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis() { + if (!isWorkerServiceAvailable()) { + throw new WebApplicationException( + Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Function worker service is not avaialable")).build()); + } + return worker().getMetricsGenerator().generate(); + } + public Response getFunctionsMetrcis(String clientRole) throws IOException { if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) { log.error("Client [{}] is not admin and authorized to get function-stats", clientRole); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java index 6e4ae55..146bb21 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.worker.rest.api.v2; import java.io.IOException; +import java.util.Collection; import javax.ws.rs.Consumes; import javax.ws.rs.GET; @@ -27,6 +28,7 @@ import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; import org.apache.pulsar.functions.worker.rest.FunctionApiResource; import io.swagger.annotations.Api; @@ -34,7 +36,6 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; @Slf4j @Path("/worker-stats") @@ -45,10 +46,18 @@ public class WorkerStats extends FunctionApiResource { @GET @Path("/functions") - @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class) - @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiOperation(value = "Get stats for all functions owned by worker", notes = "Request should be executed by Monitoring agent on each worker to fetch the function-metrics", response = Metrics.class) + @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission"), @ApiResponse(code = 503, message = "Worker service is not running") }) - public Response getMetrics() throws IOException { + public Response getStats() throws IOException { return functions.getFunctionsMetrcis(clientAppId()); } + + @GET + @Path("/metrics") + @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List") + @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") }) + public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception { + return functions.getWorkerMetrcis(clientAppId()); + } }