jerrypeng closed pull request #3050:  Correcting metrics and adding tests
URL: https://github.com/apache/pulsar/pull/3050
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 84cf8e0eae..8a13a8ab3c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -30,6 +30,7 @@
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -189,6 +190,7 @@ public Response getFunctionStatus(final 
@PathParam("tenant") String tenant,
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
     })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/stats")
     public FunctionStats getFunctionStats(final @PathParam("tenant") String 
tenant,
                                      final @PathParam("namespace") String 
namespace,
@@ -205,6 +207,7 @@ public FunctionStats getFunctionStats(final 
@PathParam("tenant") String tenant,
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
     })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
     public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
getFunctionInstanceStats(final @PathParam("tenant") String tenant,
                                              final @PathParam("namespace") 
String namespace,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 4dbdf3b250..5f6de32474 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -344,6 +344,55 @@ public void testPulsarFunctionStats() throws Exception {
         // validate pulsar sink consumer has started on the topic
         
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
 
+        // validate stats are empty
+        FunctionRuntimeManager functionRuntimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
+        FunctionStats functionStats = 
functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
+                functionName, null);
+        FunctionStats functionStatsFromAdmin = 
admin.functions().getFunctionStats(tenant, namespacePortion,
+                functionName);
+
+        assertEquals(functionStats, functionStatsFromAdmin);
+
+        assertEquals(functionStats.getReceivedTotal(), 0);
+        assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0);
+        assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.getUserExceptionsTotal(), 0);
+        assertEquals(functionStats.avgProcessLatency, null);
+        assertEquals(functionStats.oneMin.getReceivedTotal(), 0);
+        assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0);
+        assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getAvgProcessLatency(), null);
+        assertEquals(functionStats.getAvgProcessLatency(), 
functionStats.oneMin.getAvgProcessLatency());
+        assertEquals(functionStats.getLastInvocation(), null);
+
+        assertEquals(functionStats.instances.size(), 1);
+        assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().avgProcessLatency, 
null);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency(),
 null);
+
+        
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
+        
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.getAvgProcessLatency());
+
+        // validate function instance stats empty
+        FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats(tenant, 
namespacePortion,
+                functionName, 0,  null);
+
+        FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
functionInstanceStatsAdmin = admin.functions().getFunctionStats(tenant, 
namespacePortion,
+                functionName, 0);
+
+        assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
+        assertEquals(functionInstanceStats, 
functionStats.instances.get(0).getMetrics());
+
+
         int totalMsgs = 10;
         for (int i = 0; i < totalMsgs; i++) {
             String data = "my-message-" + i;
@@ -358,11 +407,12 @@ public void testPulsarFunctionStats() throws Exception {
             }
         }, 5, 200);
 
-        FunctionRuntimeManager functionRuntimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
-        FunctionStats functionStats = 
functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
+
+        // get stats after producing
+        functionStats = functionRuntimeManager.getFunctionStats(tenant, 
namespacePortion,
                 functionName, null);
         
-        FunctionStats functionStatsFromAdmin = 
admin.functions().getFunctionStats(tenant, namespacePortion,
+        functionStatsFromAdmin = admin.functions().getFunctionStats(tenant, 
namespacePortion,
                 functionName);
 
         assertEquals(functionStats, functionStatsFromAdmin);
@@ -395,6 +445,16 @@ public void testPulsarFunctionStats() throws Exception {
 
         
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
         
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.getAvgProcessLatency());
+
+        // validate function instance stats
+        functionInstanceStats = 
functionRuntimeManager.getFunctionInstanceStats(tenant, namespacePortion,
+                functionName, 0,  null);
+
+        functionInstanceStatsAdmin = 
admin.functions().getFunctionStats(tenant, namespacePortion,
+                functionName, 0);
+
+        assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
+        assertEquals(functionInstanceStats, 
functionStats.instances.get(0).getMetrics());
     }
 
     @Test(timeOut = 20000)
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
index ba274c1077..ff7327289a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
@@ -18,17 +18,20 @@
  */
 package org.apache.pulsar.common.policies.data;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import lombok.Data;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Consumer;
 
 @Data
+@JsonInclude(JsonInclude.Include.ALWAYS)
 @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", 
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", 
"lastInvocation", "instances" })
 public class FunctionStats {
 
@@ -55,7 +58,7 @@
     /**
      * Average process latency for function
      **/
-    public double avgProcessLatency;
+    public Double avgProcessLatency;
 
     @JsonProperty("1min")
     public FunctionInstanceStats.FunctionInstanceStatsDataBase oneMin = new 
FunctionInstanceStats.FunctionInstanceStatsDataBase();
@@ -63,9 +66,10 @@
     /**
      * Timestamp of when the function was last invoked by any instance
      **/
-    public long lastInvocation;
+    public Long lastInvocation;
 
     @Data
+    @JsonInclude(JsonInclude.Include.ALWAYS)
     @JsonPropertyOrder({ "instanceId", "metrics" })
     public static class FunctionInstanceStats {
 
@@ -73,6 +77,7 @@
         public int instanceId;
 
         @Data
+        @JsonInclude(JsonInclude.Include.ALWAYS)
         @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", 
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency" })
         public static class FunctionInstanceStatsDataBase {
             /**
@@ -98,10 +103,11 @@
             /**
              * Average process latency for function for instance
              **/
-            public double avgProcessLatency;
+            public Double avgProcessLatency;
         }
 
         @Data
+        @JsonInclude(JsonInclude.Include.ALWAYS)
         @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", 
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", 
"lastInvocation", "userMetrics" })
         public static class FunctionInstanceStatsData extends 
FunctionInstanceStatsDataBase {
 
@@ -111,7 +117,7 @@
             /**
              * Timestamp of when the function was last invoked for instance
              **/
-            public long lastInvocation;
+            public Long lastInvocation;
 
             /**
              * Map of user defined metrics
@@ -130,35 +136,59 @@ public void addInstance(FunctionInstanceStats 
functionInstanceStats) {
 
     public FunctionStats calculateOverall() {
 
-        lastInvocation = 0;
-        instances.forEach(new Consumer<FunctionInstanceStats>() {
-            @Override
-            public void accept(FunctionInstanceStats functionInstanceStats) {
+        int nonNullInstances = 0;
+        int nonNullInstancesOneMin = 0;
+        for (FunctionInstanceStats functionInstanceStats : instances) {
                 FunctionInstanceStats.FunctionInstanceStatsData 
functionInstanceStatsData = functionInstanceStats.getMetrics();
                 receivedTotal += functionInstanceStatsData.receivedTotal;
                 processedSuccessfullyTotal += 
functionInstanceStatsData.processedSuccessfullyTotal;
                 systemExceptionsTotal += 
functionInstanceStatsData.systemExceptionsTotal;
                 userExceptionsTotal += 
functionInstanceStatsData.userExceptionsTotal;
-                avgProcessLatency += 
functionInstanceStatsData.avgProcessLatency;
+                if (functionInstanceStatsData.avgProcessLatency != null) {
+                    if (avgProcessLatency == null) {
+                        avgProcessLatency = 0.0;
+                    }
+                    avgProcessLatency += 
functionInstanceStatsData.avgProcessLatency;
+                    nonNullInstances ++;
+                }
 
                 oneMin.receivedTotal += 
functionInstanceStatsData.oneMin.receivedTotal;
                 oneMin.processedSuccessfullyTotal += 
functionInstanceStatsData.oneMin.processedSuccessfullyTotal;
                 oneMin.systemExceptionsTotal += 
functionInstanceStatsData.oneMin.systemExceptionsTotal;
                 oneMin.userExceptionsTotal += 
functionInstanceStatsData.oneMin.userExceptionsTotal;
-                oneMin.avgProcessLatency += 
functionInstanceStatsData.oneMin.avgProcessLatency;
-
-                if (functionInstanceStatsData.lastInvocation > lastInvocation) 
{
-                    lastInvocation = functionInstanceStatsData.lastInvocation;
+                if (functionInstanceStatsData.oneMin.avgProcessLatency != 
null) {
+                    if (oneMin.avgProcessLatency == null) {
+                        oneMin.avgProcessLatency = 0.0;
+                    }
+                    oneMin.avgProcessLatency += 
functionInstanceStatsData.oneMin.avgProcessLatency;
+                    nonNullInstancesOneMin ++;
                 }
 
+                if (functionInstanceStatsData.lastInvocation != null) {
+                    if (lastInvocation == null || 
functionInstanceStatsData.lastInvocation > lastInvocation) {
+                        lastInvocation = 
functionInstanceStatsData.lastInvocation;
+                    }
+                }
             }
-        });
+
         // calculate average from sum
-        avgProcessLatency = avgProcessLatency / instances.size();
+        if (nonNullInstances > 0) {
+            avgProcessLatency = avgProcessLatency / nonNullInstances;
+        } else {
+            avgProcessLatency = null;
+        }
 
         // calculate 1min average from sum
-        oneMin.avgProcessLatency = oneMin.avgProcessLatency / instances.size();
+        if (nonNullInstancesOneMin > 0) {
+            oneMin.avgProcessLatency = oneMin.avgProcessLatency / 
nonNullInstancesOneMin;
+        } else {
+            oneMin.avgProcessLatency = null;
+        }
 
         return this;
     }
+
+    public static FunctionStats decode (String json) throws IOException {
+        return ObjectMapperFactory.getThreadLocal().readValue(json, 
FunctionStats.class);
+    }
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index d5fb80ee73..f0be5119bc 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -29,17 +29,8 @@
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
-import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.HTTPServer;
-import io.prometheus.client.hotspot.BufferPoolsExports;
-import io.prometheus.client.hotspot.ClassLoadingExports;
-import io.prometheus.client.hotspot.DefaultExports;
-import io.prometheus.client.hotspot.GarbageCollectorExports;
-import io.prometheus.client.hotspot.MemoryPoolsExports;
-import io.prometheus.client.hotspot.StandardExports;
-import io.prometheus.client.hotspot.ThreadExports;
-import io.prometheus.client.hotspot.VersionInfoExports;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -59,7 +50,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 
 /**
  * A function container implemented using java thread.
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index d6863a7c6f..b3663c90fd 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -219,14 +219,14 @@ public static PulsarAdmin getPulsarAdminClient(String 
pulsarWebServiceUrl, Strin
                     
functionInstanceStatsData.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal());
                     
functionInstanceStatsData.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal());
                     
functionInstanceStatsData.setUserExceptionsTotal(metricsData.getUserExceptionsTotal());
-                    
functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency());
-                    
functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation());
+                    
functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency()
 == 0.0 ? null : metricsData.getAvgProcessLatency());
+                    
functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation() == 
0 ? null : metricsData.getLastInvocation());
 
                     
functionInstanceStatsData.oneMin.setReceivedTotal(metricsData.getReceivedTotal1Min());
                     
functionInstanceStatsData.oneMin.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal1Min());
                     
functionInstanceStatsData.oneMin.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal1Min());
                     
functionInstanceStatsData.oneMin.setUserExceptionsTotal(metricsData.getUserExceptionsTotal1Min());
-                    
functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min());
+                    
functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min()
 == 0.0 ? null : metricsData.getAvgProcessLatency1Min());
 
                     // Filter out values that are NaN
                     Map<String, Double> statsDataMap = 
metricsData.getUserMetricsMap().entrySet().stream()
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index ea23d26257..2961afe0e3 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -36,6 +36,7 @@
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -128,6 +129,7 @@ public Response getFunctionStatus(final 
@PathParam("tenant") String tenant,
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
     })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/stats")
     public FunctionStats getFunctionStats(final @PathParam("tenant") String 
tenant,
                                      final @PathParam("namespace") String 
namespace,
@@ -144,6 +146,7 @@ public FunctionStats getFunctionStats(final 
@PathParam("tenant") String tenant,
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
     })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
     public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
getFunctionInstanceStats(final @PathParam("tenant") String tenant,
                                                                                
                   final @PathParam("namespace") String namespace,
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5f6f7b19ae..0bfeb90882 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -23,6 +23,7 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
 
@@ -42,6 +43,7 @@
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
@@ -686,6 +688,9 @@ private void testExclamationFunction(Runtime runtime,
         // get function info
         getFunctionInfoSuccess(functionName);
 
+        // get function stats
+        getFunctionStatsEmpty(functionName);
+
         // publish and consume result
         if (Runtime.JAVA == runtime) {
             // java supports schema
@@ -698,6 +703,9 @@ private void testExclamationFunction(Runtime runtime,
         // get function status
         getFunctionStatus(functionName, numMessages);
 
+        // get function stats
+        getFunctionStats(functionName, numMessages);
+
         // delete function
         deleteFunction(functionName);
 
@@ -800,6 +808,86 @@ private static void getFunctionInfoSuccess(String 
functionName) throws Exception
         assertTrue(result.getStdout().contains("\"name\": \"" + functionName + 
"\""));
     }
 
+    private static void getFunctionStatsEmpty(String functionName) throws 
Exception {
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "functions",
+                "stats",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", functionName
+        );
+
+        log.info("FUNCTION STATS: {}", result.getStdout());
+        FunctionStats functionStats = FunctionStats.decode(result.getStdout());
+
+        assertEquals(functionStats.getReceivedTotal(), 0);
+        assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0);
+        assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.getUserExceptionsTotal(), 0);
+        assertEquals(functionStats.avgProcessLatency, null);
+        assertEquals(functionStats.oneMin.getReceivedTotal(), 0);
+        assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0);
+        assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getAvgProcessLatency(), null);
+        assertEquals(functionStats.getAvgProcessLatency(), 
functionStats.oneMin.getAvgProcessLatency());
+        assertEquals(functionStats.getLastInvocation(), null);
+
+        assertEquals(functionStats.instances.size(), 1);
+        assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().avgProcessLatency, 
null);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency(),
 null);
+    }
+
+    private static void getFunctionStats(String functionName, int numMessages) 
throws Exception {
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "functions",
+                "stats",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", functionName
+        );
+
+        log.info("FUNCTION STATS: {}", result.getStdout());
+
+        FunctionStats functionStats = FunctionStats.decode(result.getStdout());
+        assertEquals(functionStats.getReceivedTotal(), numMessages);
+        assertEquals(functionStats.getProcessedSuccessfullyTotal(), 
numMessages);
+        assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.getUserExceptionsTotal(), 0);
+        assertTrue(functionStats.avgProcessLatency > 0);
+        assertEquals(functionStats.oneMin.getReceivedTotal(), numMessages);
+        assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 
numMessages);
+        assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+        assertTrue(functionStats.oneMin.getAvgProcessLatency() > 0);
+        assertEquals(functionStats.getAvgProcessLatency(), 
functionStats.oneMin.getAvgProcessLatency());
+        assertTrue(functionStats.getLastInvocation() > 0);
+
+        assertEquals(functionStats.instances.size(), 1);
+        assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 
numMessages);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
 numMessages);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
 0);
+        
assertTrue(functionStats.instances.get(0).getMetrics().avgProcessLatency > 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
 numMessages);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
 numMessages);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
 0);
+        
assertTrue(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency()
 > 0);
+    }
+
     private static void getFunctionInfoNotFound(String functionName) throws 
Exception {
         try {
             pulsarCluster.getAnyWorker().execCmd(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to