This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b721ae3   Correcting metrics and adding tests (#3050)
b721ae3 is described below

commit b721ae3bcacc0d93c259d6f59465a0c0b0d402e0
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Mon Nov 26 09:45:10 2018 -0500

     Correcting metrics and adding tests (#3050)
    
    * Correcting metrics and adding tests
    
    * remove commmented out code
    
    * remove space
    
    * fix integration test
    
    * improving impl
    
    * fix ObjectMapper
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  3 +
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 66 +++++++++++++++-
 .../pulsar/common/policies/data/FunctionStats.java | 64 +++++++++++-----
 .../pulsar/functions/runtime/JavaInstanceMain.java | 10 ---
 .../org/apache/pulsar/functions/worker/Utils.java  |  6 +-
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |  3 +
 .../integration/functions/PulsarFunctionsTest.java | 88 ++++++++++++++++++++++
 7 files changed, 207 insertions(+), 33 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 84cf8e0..8a13a8a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -30,6 +30,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -189,6 +190,7 @@ public class FunctionsBase extends AdminResource implements 
Supplier<WorkerServi
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
     })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/stats")
     public FunctionStats getFunctionStats(final @PathParam("tenant") String 
tenant,
                                      final @PathParam("namespace") String 
namespace,
@@ -205,6 +207,7 @@ public class FunctionsBase extends AdminResource implements 
Supplier<WorkerServi
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
     })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
     public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
getFunctionInstanceStats(final @PathParam("tenant") String tenant,
                                              final @PathParam("namespace") 
String namespace,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 6d89634..f9491f3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -353,6 +353,55 @@ public class PulsarFunctionE2ETest {
         // validate pulsar sink consumer has started on the topic
         
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
 
+        // validate stats are empty
+        FunctionRuntimeManager functionRuntimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
+        FunctionStats functionStats = 
functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
+                functionName, null);
+        FunctionStats functionStatsFromAdmin = 
admin.functions().getFunctionStats(tenant, namespacePortion,
+                functionName);
+
+        assertEquals(functionStats, functionStatsFromAdmin);
+
+        assertEquals(functionStats.getReceivedTotal(), 0);
+        assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0);
+        assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.getUserExceptionsTotal(), 0);
+        assertEquals(functionStats.avgProcessLatency, null);
+        assertEquals(functionStats.oneMin.getReceivedTotal(), 0);
+        assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0);
+        assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getAvgProcessLatency(), null);
+        assertEquals(functionStats.getAvgProcessLatency(), 
functionStats.oneMin.getAvgProcessLatency());
+        assertEquals(functionStats.getLastInvocation(), null);
+
+        assertEquals(functionStats.instances.size(), 1);
+        assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().avgProcessLatency, 
null);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency(),
 null);
+
+        
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
+        
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.getAvgProcessLatency());
+
+        // validate function instance stats empty
+        FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats(tenant, 
namespacePortion,
+                functionName, 0,  null);
+
+        FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
functionInstanceStatsAdmin = admin.functions().getFunctionStats(tenant, 
namespacePortion,
+                functionName, 0);
+
+        assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
+        assertEquals(functionInstanceStats, 
functionStats.instances.get(0).getMetrics());
+
+
         int totalMsgs = 10;
         for (int i = 0; i < totalMsgs; i++) {
             String data = "my-message-" + i;
@@ -367,11 +416,12 @@ public class PulsarFunctionE2ETest {
             }
         }, 5, 200);
 
-        FunctionRuntimeManager functionRuntimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
-        FunctionStats functionStats = 
functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
+
+        // get stats after producing
+        functionStats = functionRuntimeManager.getFunctionStats(tenant, 
namespacePortion,
                 functionName, null);
         
-        FunctionStats functionStatsFromAdmin = 
admin.functions().getFunctionStats(tenant, namespacePortion,
+        functionStatsFromAdmin = admin.functions().getFunctionStats(tenant, 
namespacePortion,
                 functionName);
 
         assertEquals(functionStats, functionStatsFromAdmin);
@@ -404,6 +454,16 @@ public class PulsarFunctionE2ETest {
 
         
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
         
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.getAvgProcessLatency());
+
+        // validate function instance stats
+        functionInstanceStats = 
functionRuntimeManager.getFunctionInstanceStats(tenant, namespacePortion,
+                functionName, 0,  null);
+
+        functionInstanceStatsAdmin = 
admin.functions().getFunctionStats(tenant, namespacePortion,
+                functionName, 0);
+
+        assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
+        assertEquals(functionInstanceStats, 
functionStats.instances.get(0).getMetrics());
     }
 
     @Test(timeOut = 20000)
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
index ba274c1..ff73272 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
@@ -18,17 +18,20 @@
  */
 package org.apache.pulsar.common.policies.data;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import lombok.Data;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Consumer;
 
 @Data
+@JsonInclude(JsonInclude.Include.ALWAYS)
 @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", 
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", 
"lastInvocation", "instances" })
 public class FunctionStats {
 
@@ -55,7 +58,7 @@ public class FunctionStats {
     /**
      * Average process latency for function
      **/
-    public double avgProcessLatency;
+    public Double avgProcessLatency;
 
     @JsonProperty("1min")
     public FunctionInstanceStats.FunctionInstanceStatsDataBase oneMin = new 
FunctionInstanceStats.FunctionInstanceStatsDataBase();
@@ -63,9 +66,10 @@ public class FunctionStats {
     /**
      * Timestamp of when the function was last invoked by any instance
      **/
-    public long lastInvocation;
+    public Long lastInvocation;
 
     @Data
+    @JsonInclude(JsonInclude.Include.ALWAYS)
     @JsonPropertyOrder({ "instanceId", "metrics" })
     public static class FunctionInstanceStats {
 
@@ -73,6 +77,7 @@ public class FunctionStats {
         public int instanceId;
 
         @Data
+        @JsonInclude(JsonInclude.Include.ALWAYS)
         @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", 
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency" })
         public static class FunctionInstanceStatsDataBase {
             /**
@@ -98,10 +103,11 @@ public class FunctionStats {
             /**
              * Average process latency for function for instance
              **/
-            public double avgProcessLatency;
+            public Double avgProcessLatency;
         }
 
         @Data
+        @JsonInclude(JsonInclude.Include.ALWAYS)
         @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", 
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", 
"lastInvocation", "userMetrics" })
         public static class FunctionInstanceStatsData extends 
FunctionInstanceStatsDataBase {
 
@@ -111,7 +117,7 @@ public class FunctionStats {
             /**
              * Timestamp of when the function was last invoked for instance
              **/
-            public long lastInvocation;
+            public Long lastInvocation;
 
             /**
              * Map of user defined metrics
@@ -130,35 +136,59 @@ public class FunctionStats {
 
     public FunctionStats calculateOverall() {
 
-        lastInvocation = 0;
-        instances.forEach(new Consumer<FunctionInstanceStats>() {
-            @Override
-            public void accept(FunctionInstanceStats functionInstanceStats) {
+        int nonNullInstances = 0;
+        int nonNullInstancesOneMin = 0;
+        for (FunctionInstanceStats functionInstanceStats : instances) {
                 FunctionInstanceStats.FunctionInstanceStatsData 
functionInstanceStatsData = functionInstanceStats.getMetrics();
                 receivedTotal += functionInstanceStatsData.receivedTotal;
                 processedSuccessfullyTotal += 
functionInstanceStatsData.processedSuccessfullyTotal;
                 systemExceptionsTotal += 
functionInstanceStatsData.systemExceptionsTotal;
                 userExceptionsTotal += 
functionInstanceStatsData.userExceptionsTotal;
-                avgProcessLatency += 
functionInstanceStatsData.avgProcessLatency;
+                if (functionInstanceStatsData.avgProcessLatency != null) {
+                    if (avgProcessLatency == null) {
+                        avgProcessLatency = 0.0;
+                    }
+                    avgProcessLatency += 
functionInstanceStatsData.avgProcessLatency;
+                    nonNullInstances ++;
+                }
 
                 oneMin.receivedTotal += 
functionInstanceStatsData.oneMin.receivedTotal;
                 oneMin.processedSuccessfullyTotal += 
functionInstanceStatsData.oneMin.processedSuccessfullyTotal;
                 oneMin.systemExceptionsTotal += 
functionInstanceStatsData.oneMin.systemExceptionsTotal;
                 oneMin.userExceptionsTotal += 
functionInstanceStatsData.oneMin.userExceptionsTotal;
-                oneMin.avgProcessLatency += 
functionInstanceStatsData.oneMin.avgProcessLatency;
-
-                if (functionInstanceStatsData.lastInvocation > lastInvocation) 
{
-                    lastInvocation = functionInstanceStatsData.lastInvocation;
+                if (functionInstanceStatsData.oneMin.avgProcessLatency != 
null) {
+                    if (oneMin.avgProcessLatency == null) {
+                        oneMin.avgProcessLatency = 0.0;
+                    }
+                    oneMin.avgProcessLatency += 
functionInstanceStatsData.oneMin.avgProcessLatency;
+                    nonNullInstancesOneMin ++;
                 }
 
+                if (functionInstanceStatsData.lastInvocation != null) {
+                    if (lastInvocation == null || 
functionInstanceStatsData.lastInvocation > lastInvocation) {
+                        lastInvocation = 
functionInstanceStatsData.lastInvocation;
+                    }
+                }
             }
-        });
+
         // calculate average from sum
-        avgProcessLatency = avgProcessLatency / instances.size();
+        if (nonNullInstances > 0) {
+            avgProcessLatency = avgProcessLatency / nonNullInstances;
+        } else {
+            avgProcessLatency = null;
+        }
 
         // calculate 1min average from sum
-        oneMin.avgProcessLatency = oneMin.avgProcessLatency / instances.size();
+        if (nonNullInstancesOneMin > 0) {
+            oneMin.avgProcessLatency = oneMin.avgProcessLatency / 
nonNullInstancesOneMin;
+        } else {
+            oneMin.avgProcessLatency = null;
+        }
 
         return this;
     }
+
+    public static FunctionStats decode (String json) throws IOException {
+        return ObjectMapperFactory.getThreadLocal().readValue(json, 
FunctionStats.class);
+    }
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index d5fb80e..f0be511 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -29,17 +29,8 @@ import com.google.protobuf.util.JsonFormat;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
-import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.HTTPServer;
-import io.prometheus.client.hotspot.BufferPoolsExports;
-import io.prometheus.client.hotspot.ClassLoadingExports;
-import io.prometheus.client.hotspot.DefaultExports;
-import io.prometheus.client.hotspot.GarbageCollectorExports;
-import io.prometheus.client.hotspot.MemoryPoolsExports;
-import io.prometheus.client.hotspot.StandardExports;
-import io.prometheus.client.hotspot.ThreadExports;
-import io.prometheus.client.hotspot.VersionInfoExports;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -59,7 +50,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 
 /**
  * A function container implemented using java thread.
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index d6863a7..b3663c9 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -219,14 +219,14 @@ public final class Utils {
                     
functionInstanceStatsData.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal());
                     
functionInstanceStatsData.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal());
                     
functionInstanceStatsData.setUserExceptionsTotal(metricsData.getUserExceptionsTotal());
-                    
functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency());
-                    
functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation());
+                    
functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency()
 == 0.0 ? null : metricsData.getAvgProcessLatency());
+                    
functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation() == 
0 ? null : metricsData.getLastInvocation());
 
                     
functionInstanceStatsData.oneMin.setReceivedTotal(metricsData.getReceivedTotal1Min());
                     
functionInstanceStatsData.oneMin.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal1Min());
                     
functionInstanceStatsData.oneMin.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal1Min());
                     
functionInstanceStatsData.oneMin.setUserExceptionsTotal(metricsData.getUserExceptionsTotal1Min());
-                    
functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min());
+                    
functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min()
 == 0.0 ? null : metricsData.getAvgProcessLatency1Min());
 
                     // Filter out values that are NaN
                     Map<String, Double> statsDataMap = 
metricsData.getUserMetricsMap().entrySet().stream()
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index ea23d26..2961afe 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -36,6 +36,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -128,6 +129,7 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
     })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/stats")
     public FunctionStats getFunctionStats(final @PathParam("tenant") String 
tenant,
                                      final @PathParam("namespace") String 
namespace,
@@ -144,6 +146,7 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
     })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
     public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
getFunctionInstanceStats(final @PathParam("tenant") String tenant,
                                                                                
                   final @PathParam("namespace") String namespace,
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5f6f7b1..0bfeb90 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
 
@@ -42,6 +43,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
@@ -686,6 +688,9 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         // get function info
         getFunctionInfoSuccess(functionName);
 
+        // get function stats
+        getFunctionStatsEmpty(functionName);
+
         // publish and consume result
         if (Runtime.JAVA == runtime) {
             // java supports schema
@@ -698,6 +703,9 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         // get function status
         getFunctionStatus(functionName, numMessages);
 
+        // get function stats
+        getFunctionStats(functionName, numMessages);
+
         // delete function
         deleteFunction(functionName);
 
@@ -800,6 +808,86 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         assertTrue(result.getStdout().contains("\"name\": \"" + functionName + 
"\""));
     }
 
+    private static void getFunctionStatsEmpty(String functionName) throws 
Exception {
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "functions",
+                "stats",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", functionName
+        );
+
+        log.info("FUNCTION STATS: {}", result.getStdout());
+        FunctionStats functionStats = FunctionStats.decode(result.getStdout());
+
+        assertEquals(functionStats.getReceivedTotal(), 0);
+        assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0);
+        assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.getUserExceptionsTotal(), 0);
+        assertEquals(functionStats.avgProcessLatency, null);
+        assertEquals(functionStats.oneMin.getReceivedTotal(), 0);
+        assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0);
+        assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getAvgProcessLatency(), null);
+        assertEquals(functionStats.getAvgProcessLatency(), 
functionStats.oneMin.getAvgProcessLatency());
+        assertEquals(functionStats.getLastInvocation(), null);
+
+        assertEquals(functionStats.instances.size(), 1);
+        assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().avgProcessLatency, 
null);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency(),
 null);
+    }
+
+    private static void getFunctionStats(String functionName, int numMessages) 
throws Exception {
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "functions",
+                "stats",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", functionName
+        );
+
+        log.info("FUNCTION STATS: {}", result.getStdout());
+
+        FunctionStats functionStats = FunctionStats.decode(result.getStdout());
+        assertEquals(functionStats.getReceivedTotal(), numMessages);
+        assertEquals(functionStats.getProcessedSuccessfullyTotal(), 
numMessages);
+        assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.getUserExceptionsTotal(), 0);
+        assertTrue(functionStats.avgProcessLatency > 0);
+        assertEquals(functionStats.oneMin.getReceivedTotal(), numMessages);
+        assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 
numMessages);
+        assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+        assertTrue(functionStats.oneMin.getAvgProcessLatency() > 0);
+        assertEquals(functionStats.getAvgProcessLatency(), 
functionStats.oneMin.getAvgProcessLatency());
+        assertTrue(functionStats.getLastInvocation() > 0);
+
+        assertEquals(functionStats.instances.size(), 1);
+        assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 
numMessages);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
 numMessages);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
 0);
+        
assertTrue(functionStats.instances.get(0).getMetrics().avgProcessLatency > 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
 numMessages);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
 numMessages);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
 0);
+        
assertTrue(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency()
 > 0);
+    }
+
     private static void getFunctionInfoNotFound(String functionName) throws 
Exception {
         try {
             pulsarCluster.getAnyWorker().execCmd(

Reply via email to