This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b721ae3 Correcting metrics and adding tests (#3050)
b721ae3 is described below
commit b721ae3bcacc0d93c259d6f59465a0c0b0d402e0
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Mon Nov 26 09:45:10 2018 -0500
Correcting metrics and adding tests (#3050)
* Correcting metrics and adding tests
* remove commmented out code
* remove space
* fix integration test
* improving impl
* fix ObjectMapper
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 3 +
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 66 +++++++++++++++-
.../pulsar/common/policies/data/FunctionStats.java | 64 +++++++++++-----
.../pulsar/functions/runtime/JavaInstanceMain.java | 10 ---
.../org/apache/pulsar/functions/worker/Utils.java | 6 +-
.../worker/rest/api/v2/FunctionApiV2Resource.java | 3 +
.../integration/functions/PulsarFunctionsTest.java | 88 ++++++++++++++++++++++
7 files changed, 207 insertions(+), 33 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 84cf8e0..8a13a8a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -30,6 +30,7 @@ import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -189,6 +190,7 @@ public class FunctionsBase extends AdminResource implements
Supplier<WorkerServi
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
})
+ @Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/stats")
public FunctionStats getFunctionStats(final @PathParam("tenant") String
tenant,
final @PathParam("namespace") String
namespace,
@@ -205,6 +207,7 @@ public class FunctionsBase extends AdminResource implements
Supplier<WorkerServi
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
})
+ @Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData
getFunctionInstanceStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace")
String namespace,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 6d89634..f9491f3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -353,6 +353,55 @@ public class PulsarFunctionE2ETest {
// validate pulsar sink consumer has started on the topic
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+ // validate stats are empty
+ FunctionRuntimeManager functionRuntimeManager =
functionsWorkerService.getFunctionRuntimeManager();
+ FunctionStats functionStats =
functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
+ functionName, null);
+ FunctionStats functionStatsFromAdmin =
admin.functions().getFunctionStats(tenant, namespacePortion,
+ functionName);
+
+ assertEquals(functionStats, functionStatsFromAdmin);
+
+ assertEquals(functionStats.getReceivedTotal(), 0);
+ assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0);
+ assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+ assertEquals(functionStats.getUserExceptionsTotal(), 0);
+ assertEquals(functionStats.avgProcessLatency, null);
+ assertEquals(functionStats.oneMin.getReceivedTotal(), 0);
+ assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0);
+ assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+ assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+ assertEquals(functionStats.oneMin.getAvgProcessLatency(), null);
+ assertEquals(functionStats.getAvgProcessLatency(),
functionStats.oneMin.getAvgProcessLatency());
+ assertEquals(functionStats.getLastInvocation(), null);
+
+ assertEquals(functionStats.instances.size(), 1);
+ assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().avgProcessLatency,
null);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency(),
null);
+
+
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
+
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.getAvgProcessLatency());
+
+ // validate function instance stats empty
+ FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData
functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats(tenant,
namespacePortion,
+ functionName, 0, null);
+
+ FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData
functionInstanceStatsAdmin = admin.functions().getFunctionStats(tenant,
namespacePortion,
+ functionName, 0);
+
+ assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
+ assertEquals(functionInstanceStats,
functionStats.instances.get(0).getMetrics());
+
+
int totalMsgs = 10;
for (int i = 0; i < totalMsgs; i++) {
String data = "my-message-" + i;
@@ -367,11 +416,12 @@ public class PulsarFunctionE2ETest {
}
}, 5, 200);
- FunctionRuntimeManager functionRuntimeManager =
functionsWorkerService.getFunctionRuntimeManager();
- FunctionStats functionStats =
functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
+
+ // get stats after producing
+ functionStats = functionRuntimeManager.getFunctionStats(tenant,
namespacePortion,
functionName, null);
- FunctionStats functionStatsFromAdmin =
admin.functions().getFunctionStats(tenant, namespacePortion,
+ functionStatsFromAdmin = admin.functions().getFunctionStats(tenant,
namespacePortion,
functionName);
assertEquals(functionStats, functionStatsFromAdmin);
@@ -404,6 +454,16 @@ public class PulsarFunctionE2ETest {
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.getAvgProcessLatency());
+
+ // validate function instance stats
+ functionInstanceStats =
functionRuntimeManager.getFunctionInstanceStats(tenant, namespacePortion,
+ functionName, 0, null);
+
+ functionInstanceStatsAdmin =
admin.functions().getFunctionStats(tenant, namespacePortion,
+ functionName, 0);
+
+ assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
+ assertEquals(functionInstanceStats,
functionStats.instances.get(0).getMetrics());
}
@Test(timeOut = 20000)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
index ba274c1..ff73272 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
@@ -18,17 +18,20 @@
*/
package org.apache.pulsar.common.policies.data;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import lombok.Data;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.function.Consumer;
@Data
+@JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal",
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min",
"lastInvocation", "instances" })
public class FunctionStats {
@@ -55,7 +58,7 @@ public class FunctionStats {
/**
* Average process latency for function
**/
- public double avgProcessLatency;
+ public Double avgProcessLatency;
@JsonProperty("1min")
public FunctionInstanceStats.FunctionInstanceStatsDataBase oneMin = new
FunctionInstanceStats.FunctionInstanceStatsDataBase();
@@ -63,9 +66,10 @@ public class FunctionStats {
/**
* Timestamp of when the function was last invoked by any instance
**/
- public long lastInvocation;
+ public Long lastInvocation;
@Data
+ @JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "instanceId", "metrics" })
public static class FunctionInstanceStats {
@@ -73,6 +77,7 @@ public class FunctionStats {
public int instanceId;
@Data
+ @JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal",
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency" })
public static class FunctionInstanceStatsDataBase {
/**
@@ -98,10 +103,11 @@ public class FunctionStats {
/**
* Average process latency for function for instance
**/
- public double avgProcessLatency;
+ public Double avgProcessLatency;
}
@Data
+ @JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal",
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min",
"lastInvocation", "userMetrics" })
public static class FunctionInstanceStatsData extends
FunctionInstanceStatsDataBase {
@@ -111,7 +117,7 @@ public class FunctionStats {
/**
* Timestamp of when the function was last invoked for instance
**/
- public long lastInvocation;
+ public Long lastInvocation;
/**
* Map of user defined metrics
@@ -130,35 +136,59 @@ public class FunctionStats {
public FunctionStats calculateOverall() {
- lastInvocation = 0;
- instances.forEach(new Consumer<FunctionInstanceStats>() {
- @Override
- public void accept(FunctionInstanceStats functionInstanceStats) {
+ int nonNullInstances = 0;
+ int nonNullInstancesOneMin = 0;
+ for (FunctionInstanceStats functionInstanceStats : instances) {
FunctionInstanceStats.FunctionInstanceStatsData
functionInstanceStatsData = functionInstanceStats.getMetrics();
receivedTotal += functionInstanceStatsData.receivedTotal;
processedSuccessfullyTotal +=
functionInstanceStatsData.processedSuccessfullyTotal;
systemExceptionsTotal +=
functionInstanceStatsData.systemExceptionsTotal;
userExceptionsTotal +=
functionInstanceStatsData.userExceptionsTotal;
- avgProcessLatency +=
functionInstanceStatsData.avgProcessLatency;
+ if (functionInstanceStatsData.avgProcessLatency != null) {
+ if (avgProcessLatency == null) {
+ avgProcessLatency = 0.0;
+ }
+ avgProcessLatency +=
functionInstanceStatsData.avgProcessLatency;
+ nonNullInstances ++;
+ }
oneMin.receivedTotal +=
functionInstanceStatsData.oneMin.receivedTotal;
oneMin.processedSuccessfullyTotal +=
functionInstanceStatsData.oneMin.processedSuccessfullyTotal;
oneMin.systemExceptionsTotal +=
functionInstanceStatsData.oneMin.systemExceptionsTotal;
oneMin.userExceptionsTotal +=
functionInstanceStatsData.oneMin.userExceptionsTotal;
- oneMin.avgProcessLatency +=
functionInstanceStatsData.oneMin.avgProcessLatency;
-
- if (functionInstanceStatsData.lastInvocation > lastInvocation)
{
- lastInvocation = functionInstanceStatsData.lastInvocation;
+ if (functionInstanceStatsData.oneMin.avgProcessLatency !=
null) {
+ if (oneMin.avgProcessLatency == null) {
+ oneMin.avgProcessLatency = 0.0;
+ }
+ oneMin.avgProcessLatency +=
functionInstanceStatsData.oneMin.avgProcessLatency;
+ nonNullInstancesOneMin ++;
}
+ if (functionInstanceStatsData.lastInvocation != null) {
+ if (lastInvocation == null ||
functionInstanceStatsData.lastInvocation > lastInvocation) {
+ lastInvocation =
functionInstanceStatsData.lastInvocation;
+ }
+ }
}
- });
+
// calculate average from sum
- avgProcessLatency = avgProcessLatency / instances.size();
+ if (nonNullInstances > 0) {
+ avgProcessLatency = avgProcessLatency / nonNullInstances;
+ } else {
+ avgProcessLatency = null;
+ }
// calculate 1min average from sum
- oneMin.avgProcessLatency = oneMin.avgProcessLatency / instances.size();
+ if (nonNullInstancesOneMin > 0) {
+ oneMin.avgProcessLatency = oneMin.avgProcessLatency /
nonNullInstancesOneMin;
+ } else {
+ oneMin.avgProcessLatency = null;
+ }
return this;
}
+
+ public static FunctionStats decode (String json) throws IOException {
+ return ObjectMapperFactory.getThreadLocal().readValue(json,
FunctionStats.class);
+ }
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index d5fb80e..f0be511 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -29,17 +29,8 @@ import com.google.protobuf.util.JsonFormat;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
-import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.HTTPServer;
-import io.prometheus.client.hotspot.BufferPoolsExports;
-import io.prometheus.client.hotspot.ClassLoadingExports;
-import io.prometheus.client.hotspot.DefaultExports;
-import io.prometheus.client.hotspot.GarbageCollectorExports;
-import io.prometheus.client.hotspot.MemoryPoolsExports;
-import io.prometheus.client.hotspot.StandardExports;
-import io.prometheus.client.hotspot.ThreadExports;
-import io.prometheus.client.hotspot.VersionInfoExports;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -59,7 +50,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
/**
* A function container implemented using java thread.
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index d6863a7..b3663c9 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -219,14 +219,14 @@ public final class Utils {
functionInstanceStatsData.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal());
functionInstanceStatsData.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal());
functionInstanceStatsData.setUserExceptionsTotal(metricsData.getUserExceptionsTotal());
-
functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency());
-
functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation());
+
functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency()
== 0.0 ? null : metricsData.getAvgProcessLatency());
+
functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation() ==
0 ? null : metricsData.getLastInvocation());
functionInstanceStatsData.oneMin.setReceivedTotal(metricsData.getReceivedTotal1Min());
functionInstanceStatsData.oneMin.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal1Min());
functionInstanceStatsData.oneMin.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal1Min());
functionInstanceStatsData.oneMin.setUserExceptionsTotal(metricsData.getUserExceptionsTotal1Min());
-
functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min());
+
functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min()
== 0.0 ? null : metricsData.getAvgProcessLatency1Min());
// Filter out values that are NaN
Map<String, Double> statsDataMap =
metricsData.getUserMetricsMap().entrySet().stream()
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index ea23d26..2961afe 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -36,6 +36,7 @@ import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -128,6 +129,7 @@ public class FunctionApiV2Resource extends
FunctionApiResource {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
})
+ @Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/stats")
public FunctionStats getFunctionStats(final @PathParam("tenant") String
tenant,
final @PathParam("namespace") String
namespace,
@@ -144,6 +146,7 @@ public class FunctionApiV2Resource extends
FunctionApiResource {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
})
+ @Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData
getFunctionInstanceStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5f6f7b1..0bfeb90 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
import com.google.gson.Gson;
@@ -42,6 +43,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
@@ -686,6 +688,9 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
// get function info
getFunctionInfoSuccess(functionName);
+ // get function stats
+ getFunctionStatsEmpty(functionName);
+
// publish and consume result
if (Runtime.JAVA == runtime) {
// java supports schema
@@ -698,6 +703,9 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
// get function status
getFunctionStatus(functionName, numMessages);
+ // get function stats
+ getFunctionStats(functionName, numMessages);
+
// delete function
deleteFunction(functionName);
@@ -800,6 +808,86 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
assertTrue(result.getStdout().contains("\"name\": \"" + functionName +
"\""));
}
+ private static void getFunctionStatsEmpty(String functionName) throws
Exception {
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "stats",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+
+ log.info("FUNCTION STATS: {}", result.getStdout());
+ FunctionStats functionStats = FunctionStats.decode(result.getStdout());
+
+ assertEquals(functionStats.getReceivedTotal(), 0);
+ assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0);
+ assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+ assertEquals(functionStats.getUserExceptionsTotal(), 0);
+ assertEquals(functionStats.avgProcessLatency, null);
+ assertEquals(functionStats.oneMin.getReceivedTotal(), 0);
+ assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0);
+ assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+ assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+ assertEquals(functionStats.oneMin.getAvgProcessLatency(), null);
+ assertEquals(functionStats.getAvgProcessLatency(),
functionStats.oneMin.getAvgProcessLatency());
+ assertEquals(functionStats.getLastInvocation(), null);
+
+ assertEquals(functionStats.instances.size(), 1);
+ assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().avgProcessLatency,
null);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency(),
null);
+ }
+
+ private static void getFunctionStats(String functionName, int numMessages)
throws Exception {
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "stats",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+
+ log.info("FUNCTION STATS: {}", result.getStdout());
+
+ FunctionStats functionStats = FunctionStats.decode(result.getStdout());
+ assertEquals(functionStats.getReceivedTotal(), numMessages);
+ assertEquals(functionStats.getProcessedSuccessfullyTotal(),
numMessages);
+ assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+ assertEquals(functionStats.getUserExceptionsTotal(), 0);
+ assertTrue(functionStats.avgProcessLatency > 0);
+ assertEquals(functionStats.oneMin.getReceivedTotal(), numMessages);
+ assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(),
numMessages);
+ assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+ assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+ assertTrue(functionStats.oneMin.getAvgProcessLatency() > 0);
+ assertEquals(functionStats.getAvgProcessLatency(),
functionStats.oneMin.getAvgProcessLatency());
+ assertTrue(functionStats.getLastInvocation() > 0);
+
+ assertEquals(functionStats.instances.size(), 1);
+ assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(),
numMessages);
+
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
numMessages);
+
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
0);
+
assertTrue(functionStats.instances.get(0).getMetrics().avgProcessLatency > 0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
numMessages);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
numMessages);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
0);
+
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
0);
+
assertTrue(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency()
> 0);
+ }
+
private static void getFunctionInfoNotFound(String functionName) throws
Exception {
try {
pulsarCluster.getAnyWorker().execCmd(