rdhabalia closed pull request #2295: REST and CLI to get function metrics in 
json for monitoring
URL: https://github.com/apache/incubator-pulsar/pull/2295
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 293304f00f..02891be1e1 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -27,6 +27,7 @@
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
 import org.apache.pulsar.functions.worker.WorkerInfo;
 
 /**
@@ -259,4 +260,11 @@
      * @throws PulsarAdminException 
      */
     List<WorkerInfo> getCluster() throws PulsarAdminException;
+    
+    /**
+     * Get function worker metrics
+     * @return
+     * @throws PulsarAdminException 
+     */
+    Metrics getMetrics() throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index fdb3588e40..b9e52646c6 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -47,6 +47,7 @@
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
 import org.apache.pulsar.functions.worker.WorkerInfo;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
@@ -281,7 +282,23 @@ public void downloadFunction(String destinationPath, 
String path) throws PulsarA
             throw getApiException(e);
         }
     }
-    
+
+    @Override
+    public Metrics getMetrics() throws PulsarAdminException {
+        try {
+            Response response = request(functions.path("metrics")).get();
+           if (!response.getStatusInfo().equals(Response.Status.OK)) {
+               throw new ClientErrorException(response);
+           }
+           String jsonResponse = response.readEntity(String.class);
+           Metrics.Builder metricsBuilder = Metrics.newBuilder();
+           mergeJson(jsonResponse, metricsBuilder);
+           return metricsBuilder.build();
+       } catch (Exception e) {
+           throw getApiException(e);
+       }
+   }
+
     public static void mergeJson(String json, Builder builder) throws 
IOException {
         JsonFormat.parser().merge(json, builder);
     }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 9f7405cead..27a48e084e 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -109,6 +109,7 @@
     private final UploadFunction uploader;
     private final DownloadFunction downloader;
     private final GetCluster cluster;
+    private final Metrics metrics;
 
     /**
      * Base command
@@ -899,7 +900,17 @@ void runCmd() throws Exception {
             System.out.println(gson.toJson(new JsonParser().parse(json)));
         }
     }
-    
+
+    @Parameters(commandDescription = "Get function metrics")
+    class Metrics extends BaseCommand {
+        @Override
+        void runCmd() throws Exception {
+            String json = Utils.printJson(admin.functions().getMetrics());
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(new JsonParser().parse(json)));
+        }
+    }
+
     public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
         super("functions", admin);
         localRunner = new LocalRunner();
@@ -914,6 +925,7 @@ public CmdFunctions(PulsarAdmin admin) throws 
PulsarClientException {
         uploader = new UploadFunction();
         downloader = new DownloadFunction();
         cluster = new GetCluster();
+        metrics = new Metrics();
         jcommander.addCommand("localrun", getLocalRunner());
         jcommander.addCommand("create", getCreater());
         jcommander.addCommand("delete", getDeleter());
@@ -926,6 +938,7 @@ public CmdFunctions(PulsarAdmin admin) throws 
PulsarClientException {
         jcommander.addCommand("upload", getUploader());
         jcommander.addCommand("download", getDownloader());
         jcommander.addCommand("cluster", cluster);
+        jcommander.addCommand("metrics", metrics);
     }
 
     @VisibleForTesting
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto 
b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 0078bc2544..65d1b2f966 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -79,3 +79,12 @@ service InstanceControl {
     rpc GetMetrics(google.protobuf.Empty) returns (MetricsData) {}
     rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResult) {}
 }
+
+message Metrics {
+       message InstanceMetrics {
+               string name = 1;
+               int32 instanceId = 2;
+               MetricsData metricsData = 3;
+       }
+       repeated InstanceMetrics metrics = 1;
+}
\ No newline at end of file
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 50c09a075f..94e65842b5 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -77,8 +77,14 @@
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.Builder;
+import 
org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.MembershipManager;
 import org.apache.pulsar.functions.worker.Utils;
@@ -127,11 +133,11 @@ public Response registerFunction(final String tenant, 
final String namespace, fi
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
-        
+
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register function", tenant, namespace, functionName,
-                        clientRole);
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register function", tenant, namespace,
+                        functionName, clientRole);
                 return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
                         .entity(new ErrorData("client is not authorize to 
perform operation")).build();
             }
@@ -204,7 +210,7 @@ public Response updateFunction(final String tenant, final 
String namespace, fina
             return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
-        
+
         FunctionDetails functionDetails;
         boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
         // validate parameters
@@ -267,7 +273,7 @@ public Response deregisterFunction(final String tenant, 
final String namespace,
             return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
-        
+
         // validate parameters
         try {
             validateDeregisterRequestParams(tenant, namespace, functionName);
@@ -511,9 +517,9 @@ public WorkerInfo getClusterLeader() {
         WorkerInfo leader = membershipManager.getLeader();
 
         if (leader == null) {
-            throw new WebApplicationException(
-                    
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
-                            .entity(new ErrorData("Leader cannot be 
determined")).build());}
+            throw new 
WebApplicationException(Response.status(Status.INTERNAL_SERVER_ERROR)
+                    .type(MediaType.APPLICATION_JSON).entity(new 
ErrorData("Leader cannot be determined")).build());
+        }
 
         return leader;
     }
@@ -781,7 +787,6 @@ private String getFunctionCodeBuiltin(FunctionDetails 
functionDetails) {
         return null;
     }
 
-
     private FunctionDetails validateUpdateRequestParams(String tenant, String 
namespace, String functionName,
             String functionDetailsJson, File jarWithFileUrl) throws 
IllegalArgumentException {
         if (tenant == null) {
@@ -858,12 +863,12 @@ private void validateFunctionClassTypes(File jarFile, 
FunctionDetails.Builder fu
         // validate function class-type
         Object functionObject = 
createInstance(functionDetailsBuilder.getClassName(), classLoader);
         Class<?>[] typeArgs = 
org.apache.pulsar.functions.utils.Utils.getFunctionTypes(functionObject, false);
-        
+
         if (!(functionObject instanceof 
org.apache.pulsar.functions.api.Function)
                 && !(functionObject instanceof java.util.function.Function)) {
             throw new RuntimeException("User class must either be Function or 
java.util.Function");
         }
-        
+
         if (functionDetailsBuilder.hasSource() && 
functionDetailsBuilder.getSource() != null
                 && 
isNotBlank(functionDetailsBuilder.getSource().getClassName())) {
             try {
@@ -873,8 +878,7 @@ private void validateFunctionClassTypes(File jarFile, 
FunctionDetails.Builder fu
                         
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
 
                 // if sink-class not present then set same arg as source
-                if (!functionDetailsBuilder.hasSink()
-                        || 
isBlank(functionDetailsBuilder.getSink().getClassName())) {
+                if (!functionDetailsBuilder.hasSink() || 
isBlank(functionDetailsBuilder.getSink().getClassName())) {
                     functionDetailsBuilder
                             
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
                 }
@@ -899,8 +903,7 @@ private void validateFunctionClassTypes(File jarFile, 
FunctionDetails.Builder fu
                 
functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
 
                 // if source-class not present then set same arg as sink
-                if (!functionDetailsBuilder.hasSource()
-                        || 
isBlank(functionDetailsBuilder.getSource().getClassName())) {
+                if (!functionDetailsBuilder.hasSource() || 
isBlank(functionDetailsBuilder.getSource().getClassName())) {
                     functionDetailsBuilder
                             
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
                 }
@@ -911,7 +914,7 @@ private void validateFunctionClassTypes(File jarFile, 
FunctionDetails.Builder fu
                 log.error("Failed to validate sink class", e);
                 throw new IllegalArgumentException("Failed to validate sink 
class-name", e);
             }
-        } else 
if(isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())){
+        } else if 
(isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())) {
             // if function-sink-class is not present then set function-sink 
type-class according to function class
             functionDetailsBuilder
                     
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(typeArgs[1].getName()));
@@ -956,7 +959,7 @@ public static String createPackagePath(String tenant, 
String namespace, String f
         return String.format("%s/%s/%s/%s", tenant, namespace, 
Codec.encode(functionName),
                 Utils.getUniquePackageName(Codec.encode(fileName)));
     }
-    
+
     private boolean isAuthorizedRole(String tenant, String clientRole) throws 
PulsarAdminException {
         if (worker().getWorkerConfig().isAuthorizationEnabled()) {
             // skip authorization if client role is super-user
@@ -970,4 +973,53 @@ private boolean isAuthorizedRole(String tenant, String 
clientRole) throws Pulsar
         return true;
     }
 
+    public Response getMetrics() throws IOException {
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        WorkerService workerService = worker();
+        Map<String, FunctionRuntimeInfo> functionRuntimes = 
workerService.getFunctionRuntimeManager()
+                .getFunctionRuntimeInfos();
+
+        Metrics.Builder metricsBuilder = Metrics.newBuilder();
+        for (Map.Entry<String, FunctionRuntimeInfo> entry : 
functionRuntimes.entrySet()) {
+            String fullyQualifiedInstanceName = entry.getKey();
+            FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
+            RuntimeSpawner functionRuntimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
+
+            if (functionRuntimeSpawner != null) {
+                Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
+                if (functionRuntime != null) {
+                    try {
+                        InstanceCommunication.MetricsData metricsData = 
workerService.getWorkerConfig()
+                                .getMetricsSamplingPeriodSec() > 0 ? 
functionRuntime.getMetrics().get()
+                                        : 
functionRuntime.getAndResetMetrics().get();
+
+                        String tenant = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                                .getFunctionDetails().getTenant();
+                        String namespace = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                                .getFunctionDetails().getNamespace();
+                        String name = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                                .getFunctionDetails().getName();
+                        int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
+                        String qualifiedFunctionName = 
String.format("%s/%s/%s", tenant, namespace, name);
+
+                        InstanceMetrics.Builder instanceBuilder = 
InstanceMetrics.newBuilder();
+                        instanceBuilder.setName(qualifiedFunctionName);
+                        instanceBuilder.setInstanceId(instanceId);
+                        if (metricsData != null) {
+                            instanceBuilder.setMetricsData(metricsData);
+                        }
+                        metricsBuilder.addMetrics(instanceBuilder.build());
+                    } catch (InterruptedException | ExecutionException e) {
+                        log.warn("Failed to collect metrics for function 
instance {}", fullyQualifiedInstanceName, e);
+                    }
+                }
+            }
+        }
+        String jsonResponse = 
org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder);
+        return Response.status(Status.OK).entity(jsonResponse).build();
+    }
+
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 96baada967..aafbb70681 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -181,4 +181,10 @@ public Response downloadFunction(final @QueryParam("path") 
String path) {
     public List<ConnectorDefinition> getConnectorsList() throws IOException {
         return functions.getListOfConnectors();
     }
+    
+    @GET
+    @Path("/metrics")
+    public Response getMetrics() throws IOException {
+        return functions.getMetrics();
+    }
 }
diff --git a/site2/website/scripts/replace.js b/site2/website/scripts/replace.js
index 2c83b03123..3af4af26a2 100644
--- a/site2/website/scripts/replace.js
+++ b/site2/website/scripts/replace.js
@@ -120,5 +120,4 @@ for (v of versions) {
     dry: true
   };
   doReplace(opts);
-}
-
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to