rdhabalia closed pull request #2296: REST and CLI to get function metrics in json for monitoring URL: https://github.com/apache/incubator-pulsar/pull/2296
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java new file mode 100644 index 0000000000..962c4835d6 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.v2; + +import java.io.IOException; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.Response; + +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; +import org.apache.pulsar.functions.worker.rest.FunctionApiResource; + +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Path("/worker-stats") +public class WorkerStats extends FunctionApiResource { + + @GET + @Path("/functions") + @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class) + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 503, message = "Worker service is not running") }) + public Response getMetrics() throws IOException { + return functions.getFunctionsMetrcis(clientAppId()); + } +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index 293304f00f..c04873da8e 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -27,6 +27,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; import org.apache.pulsar.functions.worker.WorkerInfo; /** diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java index 48aea94289..7a41fe9809 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl; import org.apache.pulsar.client.admin.internal.SchemasImpl; import org.apache.pulsar.client.admin.internal.TopicsImpl; +import org.apache.pulsar.client.admin.internal.WorkerStatsImpl; import org.apache.pulsar.client.admin.internal.TenantsImpl; import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl; import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl; @@ -84,6 +85,7 @@ private final String serviceUrl; private final Lookup lookups; private final Functions functions; + private final WorkerStats workerStats; private final Schemas schemas; protected final WebTarget root; protected final Authentication auth; @@ -187,6 +189,7 @@ public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData) this.resourceQuotas = new ResourceQuotasImpl(root, auth); this.lookups = new LookupImpl(root, auth, useTls); this.functions = new FunctionsImpl(root, auth); + this.workerStats = new WorkerStatsImpl(root, auth); this.schemas = new SchemasImpl(root, auth); this.bookies = new BookiesImpl(root, auth); } @@ -354,6 +357,14 @@ public Functions functions() { return functions; } + /** + * + * @return the Worker stats + */ + public WorkerStats workerStats() { + return workerStats; + } + /** * @return the broker statics */ diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java new file mode 100644 index 0000000000..069e58ff6e --- /dev/null +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin; + +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; + +/** + * Admin interface for worker stats management. + */ +public interface WorkerStats { + + + /** + * Get all functions stats on a worker + * @return + * @throws PulsarAdminException + */ + Metrics getFunctionsStats() throws PulsarAdminException; +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index fdb3588e40..028da3c2b1 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -281,7 +281,7 @@ public void downloadFunction(String destinationPath, String path) throws PulsarA throw getApiException(e); } } - + public static void mergeJson(String json, Builder builder) throws IOException { JsonFormat.parser().merge(json, builder); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java new file mode 100644 index 0000000000..5b6762c5a1 --- /dev/null +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin.internal; + +import javax.ws.rs.ClientErrorException; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; + +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.WorkerStats; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; +import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class WorkerStatsImpl extends BaseResource implements WorkerStats { + + private final WebTarget workerStats; + + public WorkerStatsImpl(WebTarget web, Authentication auth) { + super(auth); + this.workerStats = web.path("/admin/worker-stats"); + } + + @Override + public Metrics getFunctionsStats() throws PulsarAdminException { + try { + Response response = request(workerStats.path("functions")).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + String jsonResponse = response.readEntity(String.class); + Metrics.Builder metricsBuilder = Metrics.newBuilder(); + mergeJson(jsonResponse, metricsBuilder); + return metricsBuilder.build(); + } catch (Exception e) { + throw getApiException(e); + } + } +} \ No newline at end of file diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java new file mode 100644 index 0000000000..640d7d7c1c --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.functions.utils.Utils; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Parameters(commandDescription = "Operations to collect function-worker statistics") +public class CmdFunctionWorkerStats extends CmdBase { + + private final FunctionsStats functionsStats; + + /** + * Base command + */ + @Getter + abstract class BaseCommand extends CliCommand { + @Override + void run() throws Exception { + processArguments(); + runCmd(); + } + + void processArguments() throws Exception { + } + + abstract void runCmd() throws Exception; + } + + @Parameters(commandDescription = "dump all functions stats") + class FunctionsStats extends BaseCommand { + + @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false) + boolean indent = false; + + @Override + void runCmd() throws Exception { + String json = Utils.printJson(admin.workerStats().getFunctionsStats()); + GsonBuilder gsonBuilder = new GsonBuilder(); + if (indent) { + gsonBuilder.setPrettyPrinting(); + } + System.out.println(gsonBuilder.create().toJson(new JsonParser().parse(json))); + } + } + + public CmdFunctionWorkerStats(PulsarAdmin admin) throws PulsarClientException { + super("functions", admin); + functionsStats = new FunctionsStats(); + jcommander.addCommand("functions", functionsStats); + } + +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 23a6cb868e..646af1e012 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -1033,7 +1033,7 @@ void runCmd() throws Exception { System.out.println(gson.toJson(new JsonParser().parse(json))); } } - + public CmdFunctions(PulsarAdmin admin) throws PulsarClientException { super("functions", admin); localRunner = new LocalRunner(); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java index 60cb84e6ab..f30b8874f6 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java @@ -99,6 +99,7 @@ commandMap.put("resource-quotas", CmdResourceQuotas.class); commandMap.put("functions", CmdFunctions.class); + commandMap.put("functions-worker-stats", CmdFunctionWorkerStats.class); commandMap.put("source", CmdSources.class); commandMap.put("sink", CmdSinks.class); } diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto index 0078bc2544..65d1b2f966 100644 --- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto +++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto @@ -79,3 +79,12 @@ service InstanceControl { rpc GetMetrics(google.protobuf.Empty) returns (MetricsData) {} rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResult) {} } + +message Metrics { + message InstanceMetrics { + string name = 1; + int32 instanceId = 2; + MetricsData metricsData = 3; + } + repeated InstanceMetrics metrics = 1; +} \ No newline at end of file diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java index ff43e2501b..8b73c13106 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java @@ -20,6 +20,7 @@ import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource; import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource; +import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStats; import org.glassfish.jersey.media.multipart.MultiPartFeature; import java.util.Arrays; @@ -36,6 +37,7 @@ private Resources() { return new HashSet<>( Arrays.asList( FunctionApiV2Resource.class, + WorkerStats.class, MultiPartFeature.class )); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 50c09a075f..c35dd52e80 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -77,8 +77,14 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.Builder; +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics; +import org.apache.pulsar.functions.runtime.Runtime; +import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; +import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.MembershipManager; import org.apache.pulsar.functions.worker.Utils; @@ -127,11 +133,11 @@ public Response registerFunction(final String tenant, final String namespace, fi if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); } - + try { if (!isAuthorizedRole(tenant, clientRole)) { - log.error("{}/{}/{} Client [{}] is not admin and authorized to register function", tenant, namespace, functionName, - clientRole); + log.error("{}/{}/{} Client [{}] is not admin and authorized to register function", tenant, namespace, + functionName, clientRole); return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) .entity(new ErrorData("client is not authorize to perform operation")).build(); } @@ -204,7 +210,7 @@ public Response updateFunction(final String tenant, final String namespace, fina return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } - + FunctionDetails functionDetails; boolean isPkgUrlProvided = isNotBlank(functionPkgUrl); // validate parameters @@ -267,7 +273,7 @@ public Response deregisterFunction(final String tenant, final String namespace, return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } - + // validate parameters try { validateDeregisterRequestParams(tenant, namespace, functionName); @@ -511,9 +517,9 @@ public WorkerInfo getClusterLeader() { WorkerInfo leader = membershipManager.getLeader(); if (leader == null) { - throw new WebApplicationException( - Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData("Leader cannot be determined")).build());} + throw new WebApplicationException(Response.status(Status.INTERNAL_SERVER_ERROR) + .type(MediaType.APPLICATION_JSON).entity(new ErrorData("Leader cannot be determined")).build()); + } return leader; } @@ -781,7 +787,6 @@ private String getFunctionCodeBuiltin(FunctionDetails functionDetails) { return null; } - private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, String functionDetailsJson, File jarWithFileUrl) throws IllegalArgumentException { if (tenant == null) { @@ -858,12 +863,12 @@ private void validateFunctionClassTypes(File jarFile, FunctionDetails.Builder fu // validate function class-type Object functionObject = createInstance(functionDetailsBuilder.getClassName(), classLoader); Class<?>[] typeArgs = org.apache.pulsar.functions.utils.Utils.getFunctionTypes(functionObject, false); - + if (!(functionObject instanceof org.apache.pulsar.functions.api.Function) && !(functionObject instanceof java.util.function.Function)) { throw new RuntimeException("User class must either be Function or java.util.Function"); } - + if (functionDetailsBuilder.hasSource() && functionDetailsBuilder.getSource() != null && isNotBlank(functionDetailsBuilder.getSource().getClassName())) { try { @@ -873,8 +878,7 @@ private void validateFunctionClassTypes(File jarFile, FunctionDetails.Builder fu .setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName)); // if sink-class not present then set same arg as source - if (!functionDetailsBuilder.hasSink() - || isBlank(functionDetailsBuilder.getSink().getClassName())) { + if (!functionDetailsBuilder.hasSink() || isBlank(functionDetailsBuilder.getSink().getClassName())) { functionDetailsBuilder .setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName)); } @@ -899,8 +903,7 @@ private void validateFunctionClassTypes(File jarFile, FunctionDetails.Builder fu functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName)); // if source-class not present then set same arg as sink - if (!functionDetailsBuilder.hasSource() - || isBlank(functionDetailsBuilder.getSource().getClassName())) { + if (!functionDetailsBuilder.hasSource() || isBlank(functionDetailsBuilder.getSource().getClassName())) { functionDetailsBuilder .setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName)); } @@ -911,7 +914,7 @@ private void validateFunctionClassTypes(File jarFile, FunctionDetails.Builder fu log.error("Failed to validate sink class", e); throw new IllegalArgumentException("Failed to validate sink class-name", e); } - } else if(isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())){ + } else if (isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())) { // if function-sink-class is not present then set function-sink type-class according to function class functionDetailsBuilder .setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(typeArgs[1].getName())); @@ -956,11 +959,11 @@ public static String createPackagePath(String tenant, String namespace, String f return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode(functionName), Utils.getUniquePackageName(Codec.encode(fileName))); } - - private boolean isAuthorizedRole(String tenant, String clientRole) throws PulsarAdminException { + + public boolean isAuthorizedRole(String tenant, String clientRole) throws PulsarAdminException { if (worker().getWorkerConfig().isAuthorizationEnabled()) { // skip authorization if client role is super-user - if (clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole)) { + if (isSuperUser(clientRole)) { return true; } TenantInfo tenantInfo = worker().getAdmin().tenants().getTenantInfo(tenant); @@ -970,4 +973,66 @@ private boolean isAuthorizedRole(String tenant, String clientRole) throws Pulsar return true; } + public boolean isSuperUser(String clientRole) { + return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole); + } + + public Response getFunctionsMetrcis(String clientRole) throws IOException { + if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) { + log.error("Client [{}] is not admin and authorized to get function-stats", clientRole); + return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("client is not authorize to perform operation")).build(); + } + return getFunctionsMetrcis(); + } + + private Response getFunctionsMetrcis() throws IOException { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + + WorkerService workerService = worker(); + Map<String, FunctionRuntimeInfo> functionRuntimes = workerService.getFunctionRuntimeManager() + .getFunctionRuntimeInfos(); + + Metrics.Builder metricsBuilder = Metrics.newBuilder(); + for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) { + String fullyQualifiedInstanceName = entry.getKey(); + FunctionRuntimeInfo functionRuntimeInfo = entry.getValue(); + RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); + + if (functionRuntimeSpawner != null) { + Runtime functionRuntime = functionRuntimeSpawner.getRuntime(); + if (functionRuntime != null) { + try { + InstanceCommunication.MetricsData metricsData = workerService.getWorkerConfig() + .getMetricsSamplingPeriodSec() > 0 ? functionRuntime.getMetrics().get() + : functionRuntime.getAndResetMetrics().get(); + + String tenant = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() + .getFunctionDetails().getTenant(); + String namespace = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() + .getFunctionDetails().getNamespace(); + String name = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() + .getFunctionDetails().getName(); + int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); + String qualifiedFunctionName = String.format("%s/%s/%s", tenant, namespace, name); + + InstanceMetrics.Builder instanceBuilder = InstanceMetrics.newBuilder(); + instanceBuilder.setName(qualifiedFunctionName); + instanceBuilder.setInstanceId(instanceId); + if (metricsData != null) { + instanceBuilder.setMetricsData(metricsData); + } + metricsBuilder.addMetrics(instanceBuilder.build()); + } catch (InterruptedException | ExecutionException e) { + log.warn("Failed to collect metrics for function instance {}", fullyQualifiedInstanceName, e); + } + } + } + } + String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder); + return Response.status(Status.OK).entity(jsonResponse).build(); + } + } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java new file mode 100644 index 0000000000..6e4ae5533d --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker.rest.api.v2; + +import java.io.IOException; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.pulsar.functions.worker.rest.FunctionApiResource; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; + +@Slf4j +@Path("/worker-stats") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +@Api(value = "/worker-stats", description = "Workers admin api", tags = "workers") +public class WorkerStats extends FunctionApiResource { + + @GET + @Path("/functions") + @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class) + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 503, message = "Worker service is not running") }) + public Response getMetrics() throws IOException { + return functions.getFunctionsMetrcis(clientAppId()); + } +} diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index 67de21a4ee..4dda58fb63 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -231,8 +231,8 @@ public void onSuccess(UserRecordResult result) { @Override public void onFailure(Throwable exception) { - LOG.error("[{}] Failed to published message for replicator of {}-{} ", kinesisSink.streamName, - resultContext.getPartitionId(), resultContext.getRecordSequence()); + LOG.error("[{}] Failed to published message for replicator of {}-{}, {} ", kinesisSink.streamName, + resultContext.getPartitionId(), resultContext.getRecordSequence(), exception.getMessage()); kinesisSink.previousPublishFailed = TRUE; if (kinesisSink.sinkContext != null) { kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1); diff --git a/site2/website/scripts/replace.js b/site2/website/scripts/replace.js index 2c83b03123..3af4af26a2 100644 --- a/site2/website/scripts/replace.js +++ b/site2/website/scripts/replace.js @@ -120,5 +120,4 @@ for (v of versions) { dry: true }; doReplace(opts); -} - +} \ No newline at end of file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
