jerrypeng closed pull request #3050: Correcting metrics and adding tests
URL: https://github.com/apache/pulsar/pull/3050
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 84cf8e0eae..8a13a8ab3c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -30,6 +30,7 @@
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -189,6 +190,7 @@ public Response getFunctionStatus(final
@PathParam("tenant") String tenant,
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
})
+ @Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/stats")
public FunctionStats getFunctionStats(final @PathParam("tenant") String
tenant,
final @PathParam("namespace") String
namespace,
@@ -205,6 +207,7 @@ public FunctionStats getFunctionStats(final
@PathParam("tenant") String tenant,
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
})
+ @Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData
getFunctionInstanceStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace")
String namespace,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 4dbdf3b250..5f6de32474 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -344,6 +344,55 @@ public void testPulsarFunctionStats() throws Exception {
// validate pulsar sink consumer has started on the topic
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+ // validate stats are empty
+ FunctionRuntimeManager functionRuntimeManager =
functionsWorkerService.getFunctionRuntimeManager();
+ FunctionStats functionStats =
functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
+ functionName, null);
+ FunctionStats functionStatsFromAdmin =
admin.functions().getFunctionStats(tenant, namespacePortion,
+ functionName);
+
+ assertEquals(functionStats, functionStatsFromAdmin);
+
+ assertEquals(functionStats.getReceivedTotal(), 0);
+ assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0);
+ assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+ assertEquals(functionStats.getUserExceptionsTotal(), 0);
+ assertEquals(functionStats.avgProcessLatency, null);
+ assertEquals(functionStats.oneMin.getReceivedTotal(), 0);
+ assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0);
+ assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+ assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+ assertEquals(functionStats.oneMin.getAvgProcessLatency(), null);
+ assertEquals(functionStats.getAvgProcessLatency(),
functionStats.oneMin.getAvgProcessLatency());
+ assertEquals(functionStats.getLastInvocation(), null);
+
+ assertEquals(functionStats.instances.size(), 1);
+ assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().avgProcessLatency,
null);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency(),
null);
+
+
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
+
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.getAvgProcessLatency());
+
+ // validate function instance stats empty
+ FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData
functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats(tenant,
namespacePortion,
+ functionName, 0, null);
+
+ FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData
functionInstanceStatsAdmin = admin.functions().getFunctionStats(tenant,
namespacePortion,
+ functionName, 0);
+
+ assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
+ assertEquals(functionInstanceStats,
functionStats.instances.get(0).getMetrics());
+
+
int totalMsgs = 10;
for (int i = 0; i < totalMsgs; i++) {
String data = "my-message-" + i;
@@ -358,11 +407,12 @@ public void testPulsarFunctionStats() throws Exception {
}
}, 5, 200);
- FunctionRuntimeManager functionRuntimeManager =
functionsWorkerService.getFunctionRuntimeManager();
- FunctionStats functionStats =
functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
+
+ // get stats after producing
+ functionStats = functionRuntimeManager.getFunctionStats(tenant,
namespacePortion,
functionName, null);
- FunctionStats functionStatsFromAdmin =
admin.functions().getFunctionStats(tenant, namespacePortion,
+ functionStatsFromAdmin = admin.functions().getFunctionStats(tenant,
namespacePortion,
functionName);
assertEquals(functionStats, functionStatsFromAdmin);
@@ -395,6 +445,16 @@ public void testPulsarFunctionStats() throws Exception {
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.getAvgProcessLatency());
+
+ // validate function instance stats
+ functionInstanceStats =
functionRuntimeManager.getFunctionInstanceStats(tenant, namespacePortion,
+ functionName, 0, null);
+
+ functionInstanceStatsAdmin =
admin.functions().getFunctionStats(tenant, namespacePortion,
+ functionName, 0);
+
+ assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
+ assertEquals(functionInstanceStats,
functionStats.instances.get(0).getMetrics());
}
@Test(timeOut = 20000)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
index ba274c1077..ff7327289a 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
@@ -18,17 +18,20 @@
*/
package org.apache.pulsar.common.policies.data;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import lombok.Data;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.function.Consumer;
@Data
+@JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal",
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min",
"lastInvocation", "instances" })
public class FunctionStats {
@@ -55,7 +58,7 @@
/**
* Average process latency for function
**/
- public double avgProcessLatency;
+ public Double avgProcessLatency;
@JsonProperty("1min")
public FunctionInstanceStats.FunctionInstanceStatsDataBase oneMin = new
FunctionInstanceStats.FunctionInstanceStatsDataBase();
@@ -63,9 +66,10 @@
/**
* Timestamp of when the function was last invoked by any instance
**/
- public long lastInvocation;
+ public Long lastInvocation;
@Data
+ @JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "instanceId", "metrics" })
public static class FunctionInstanceStats {
@@ -73,6 +77,7 @@
public int instanceId;
@Data
+ @JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal",
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency" })
public static class FunctionInstanceStatsDataBase {
/**
@@ -98,10 +103,11 @@
/**
* Average process latency for function for instance
**/
- public double avgProcessLatency;
+ public Double avgProcessLatency;
}
@Data
+ @JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal",
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min",
"lastInvocation", "userMetrics" })
public static class FunctionInstanceStatsData extends
FunctionInstanceStatsDataBase {
@@ -111,7 +117,7 @@
/**
* Timestamp of when the function was last invoked for instance
**/
- public long lastInvocation;
+ public Long lastInvocation;
/**
* Map of user defined metrics
@@ -130,35 +136,59 @@ public void addInstance(FunctionInstanceStats
functionInstanceStats) {
public FunctionStats calculateOverall() {
- lastInvocation = 0;
- instances.forEach(new Consumer<FunctionInstanceStats>() {
- @Override
- public void accept(FunctionInstanceStats functionInstanceStats) {
+ int nonNullInstances = 0;
+ int nonNullInstancesOneMin = 0;
+ for (FunctionInstanceStats functionInstanceStats : instances) {
FunctionInstanceStats.FunctionInstanceStatsData
functionInstanceStatsData = functionInstanceStats.getMetrics();
receivedTotal += functionInstanceStatsData.receivedTotal;
processedSuccessfullyTotal +=
functionInstanceStatsData.processedSuccessfullyTotal;
systemExceptionsTotal +=
functionInstanceStatsData.systemExceptionsTotal;
userExceptionsTotal +=
functionInstanceStatsData.userExceptionsTotal;
- avgProcessLatency +=
functionInstanceStatsData.avgProcessLatency;
+ if (functionInstanceStatsData.avgProcessLatency != null) {
+ if (avgProcessLatency == null) {
+ avgProcessLatency = 0.0;
+ }
+ avgProcessLatency +=
functionInstanceStatsData.avgProcessLatency;
+ nonNullInstances ++;
+ }
oneMin.receivedTotal +=
functionInstanceStatsData.oneMin.receivedTotal;
oneMin.processedSuccessfullyTotal +=
functionInstanceStatsData.oneMin.processedSuccessfullyTotal;
oneMin.systemExceptionsTotal +=
functionInstanceStatsData.oneMin.systemExceptionsTotal;
oneMin.userExceptionsTotal +=
functionInstanceStatsData.oneMin.userExceptionsTotal;
- oneMin.avgProcessLatency +=
functionInstanceStatsData.oneMin.avgProcessLatency;
-
- if (functionInstanceStatsData.lastInvocation > lastInvocation)
{
- lastInvocation = functionInstanceStatsData.lastInvocation;
+ if (functionInstanceStatsData.oneMin.avgProcessLatency !=
null) {
+ if (oneMin.avgProcessLatency == null) {
+ oneMin.avgProcessLatency = 0.0;
+ }
+ oneMin.avgProcessLatency +=
functionInstanceStatsData.oneMin.avgProcessLatency;
+ nonNullInstancesOneMin ++;
}
+ if (functionInstanceStatsData.lastInvocation != null) {
+ if (lastInvocation == null ||
functionInstanceStatsData.lastInvocation > lastInvocation) {
+ lastInvocation =
functionInstanceStatsData.lastInvocation;
+ }
+ }
}
- });
+
// calculate average from sum
- avgProcessLatency = avgProcessLatency / instances.size();
+ if (nonNullInstances > 0) {
+ avgProcessLatency = avgProcessLatency / nonNullInstances;
+ } else {
+ avgProcessLatency = null;
+ }
// calculate 1min average from sum
- oneMin.avgProcessLatency = oneMin.avgProcessLatency / instances.size();
+ if (nonNullInstancesOneMin > 0) {
+ oneMin.avgProcessLatency = oneMin.avgProcessLatency /
nonNullInstancesOneMin;
+ } else {
+ oneMin.avgProcessLatency = null;
+ }
return this;
}
+
+ public static FunctionStats decode (String json) throws IOException {
+ return ObjectMapperFactory.getThreadLocal().readValue(json,
FunctionStats.class);
+ }
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index d5fb80ee73..f0be5119bc 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -29,17 +29,8 @@
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
-import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.HTTPServer;
-import io.prometheus.client.hotspot.BufferPoolsExports;
-import io.prometheus.client.hotspot.ClassLoadingExports;
-import io.prometheus.client.hotspot.DefaultExports;
-import io.prometheus.client.hotspot.GarbageCollectorExports;
-import io.prometheus.client.hotspot.MemoryPoolsExports;
-import io.prometheus.client.hotspot.StandardExports;
-import io.prometheus.client.hotspot.ThreadExports;
-import io.prometheus.client.hotspot.VersionInfoExports;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -59,7 +50,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
/**
* A function container implemented using java thread.
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index d6863a7c6f..b3663c90fd 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -219,14 +219,14 @@ public static PulsarAdmin getPulsarAdminClient(String
pulsarWebServiceUrl, Strin
functionInstanceStatsData.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal());
functionInstanceStatsData.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal());
functionInstanceStatsData.setUserExceptionsTotal(metricsData.getUserExceptionsTotal());
-
functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency());
-
functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation());
+
functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency()
== 0.0 ? null : metricsData.getAvgProcessLatency());
+
functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation() ==
0 ? null : metricsData.getLastInvocation());
functionInstanceStatsData.oneMin.setReceivedTotal(metricsData.getReceivedTotal1Min());
functionInstanceStatsData.oneMin.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal1Min());
functionInstanceStatsData.oneMin.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal1Min());
functionInstanceStatsData.oneMin.setUserExceptionsTotal(metricsData.getUserExceptionsTotal1Min());
-
functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min());
+
functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min()
== 0.0 ? null : metricsData.getAvgProcessLatency1Min());
// Filter out values that are NaN
Map<String, Double> statsDataMap =
metricsData.getUserMetricsMap().entrySet().stream()
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index ea23d26257..2961afe0e3 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -36,6 +36,7 @@
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -128,6 +129,7 @@ public Response getFunctionStatus(final
@PathParam("tenant") String tenant,
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
})
+ @Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/stats")
public FunctionStats getFunctionStats(final @PathParam("tenant") String
tenant,
final @PathParam("namespace") String
namespace,
@@ -144,6 +146,7 @@ public FunctionStats getFunctionStats(final
@PathParam("tenant") String tenant,
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
})
+ @Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData
getFunctionInstanceStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5f6f7b19ae..0bfeb90882 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -23,6 +23,7 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
import com.google.gson.Gson;
@@ -42,6 +43,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
@@ -686,6 +688,9 @@ private void testExclamationFunction(Runtime runtime,
// get function info
getFunctionInfoSuccess(functionName);
+ // get function stats
+ getFunctionStatsEmpty(functionName);
+
// publish and consume result
if (Runtime.JAVA == runtime) {
// java supports schema
@@ -698,6 +703,9 @@ private void testExclamationFunction(Runtime runtime,
// get function status
getFunctionStatus(functionName, numMessages);
+ // get function stats
+ getFunctionStats(functionName, numMessages);
+
// delete function
deleteFunction(functionName);
@@ -800,6 +808,86 @@ private static void getFunctionInfoSuccess(String
functionName) throws Exception
assertTrue(result.getStdout().contains("\"name\": \"" + functionName +
"\""));
}
+ private static void getFunctionStatsEmpty(String functionName) throws
Exception {
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "stats",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+
+ log.info("FUNCTION STATS: {}", result.getStdout());
+ FunctionStats functionStats = FunctionStats.decode(result.getStdout());
+
+ assertEquals(functionStats.getReceivedTotal(), 0);
+ assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0);
+ assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+ assertEquals(functionStats.getUserExceptionsTotal(), 0);
+ assertEquals(functionStats.avgProcessLatency, null);
+ assertEquals(functionStats.oneMin.getReceivedTotal(), 0);
+ assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0);
+ assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+ assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+ assertEquals(functionStats.oneMin.getAvgProcessLatency(), null);
+ assertEquals(functionStats.getAvgProcessLatency(),
functionStats.oneMin.getAvgProcessLatency());
+ assertEquals(functionStats.getLastInvocation(), null);
+
+ assertEquals(functionStats.instances.size(), 1);
+ assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().avgProcessLatency,
null);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency(),
null);
+ }
+
+ private static void getFunctionStats(String functionName, int numMessages)
throws Exception {
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "stats",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+
+ log.info("FUNCTION STATS: {}", result.getStdout());
+
+ FunctionStats functionStats = FunctionStats.decode(result.getStdout());
+ assertEquals(functionStats.getReceivedTotal(), numMessages);
+ assertEquals(functionStats.getProcessedSuccessfullyTotal(),
numMessages);
+ assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+ assertEquals(functionStats.getUserExceptionsTotal(), 0);
+ assertTrue(functionStats.avgProcessLatency > 0);
+ assertEquals(functionStats.oneMin.getReceivedTotal(), numMessages);
+ assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(),
numMessages);
+ assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+ assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+ assertTrue(functionStats.oneMin.getAvgProcessLatency() > 0);
+ assertEquals(functionStats.getAvgProcessLatency(),
functionStats.oneMin.getAvgProcessLatency());
+ assertTrue(functionStats.getLastInvocation() > 0);
+
+ assertEquals(functionStats.instances.size(), 1);
+ assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(),
numMessages);
+
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
numMessages);
+
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
0);
+
assertTrue(functionStats.instances.get(0).getMetrics().avgProcessLatency > 0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
numMessages);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
numMessages);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
0);
+
assertTrue(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency()
> 0);
+ }
+
private static void getFunctionInfoNotFound(String functionName) throws
Exception {
try {
pulsarCluster.getAnyWorker().execCmd(
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services