jerrypeng closed pull request #3261: add sink and source prometheus stats
URL: https://github.com/apache/pulsar/pull/3261
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/distribution/io/src/assemble/io.xml
b/distribution/io/src/assemble/io.xml
index a0704c32b3..10ea149cd3 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -83,7 +83,7 @@
</file>
<file>
-
<source>${basedir}/../../pulsar-io/data-genenator/target/pulsar-io-data-generator-${project.version}.nar</source>
+
<source>${basedir}/../../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar</source>
<outputDirectory>connectors</outputDirectory>
<fileMode>644</fileMode>
</file>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 07c9d6f3a3..c45113288a 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -271,6 +271,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-data-generator</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
@@ -328,7 +335,8 @@
<echo>copy test examples package</echo>
<mkdir dir="${basedir}/src/test/resources"/>
<copy
file="${basedir}/../pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar"
tofile="${basedir}/src/test/resources/pulsar-functions-api-examples.jar"/>
- </tasks>
+ <copy
file="${basedir}/../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar"
tofile="${basedir}/src/test/resources/pulsar-io-data-generator.nar"/>
+ </tasks>
</configuration>
</execution>
</executions>
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 7841453c20..fa1177d408 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -40,6 +40,8 @@
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
@@ -65,6 +67,7 @@
import java.net.InetAddress;
import java.net.URL;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -78,6 +81,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static
org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -191,6 +195,8 @@ void setup(Method method) throws Exception {
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
admin.tenants().updateTenant(tenant, propAdmin);
+ System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");
+
Thread.sleep(100);
}
@@ -257,6 +263,28 @@ protected static FunctionConfig
createFunctionConfig(String tenant, String names
return functionConfig;
}
+ private static SourceConfig createSourceConfig(String tenant, String
namespace, String functionName, String sinkTopic) {
+ SourceConfig sourceConfig = new SourceConfig();
+ sourceConfig.setTenant(tenant);
+ sourceConfig.setNamespace(namespace);
+ sourceConfig.setName(functionName);
+ sourceConfig.setParallelism(1);
+
sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ sourceConfig.setTopicName(sinkTopic);
+ return sourceConfig;
+ }
+
+ private static SinkConfig createSinkConfig(String tenant, String
namespace, String functionName, String sourceTopic, String subName) {
+ SinkConfig sinkConfig = new SinkConfig();
+ sinkConfig.setTenant(tenant);
+ sinkConfig.setNamespace(namespace);
+ sinkConfig.setName(functionName);
+ sinkConfig.setParallelism(1);
+
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ sinkConfig.setInputs(Collections.singleton(sourceTopic));
+ sinkConfig.setSourceSubscriptionName(subName);
+ return sinkConfig;
+ }
/**
* Validates pulsar sink e2e functionality on functions.
*
@@ -324,6 +352,285 @@ public void testE2EPulsarFunction() throws Exception {
}
+ @Test(timeOut = 20000)
+ public void testPulsarSinkStats() throws Exception {
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopic = "persistent://" + replNamespace + "/input";
+ final String functionName = "PulsarSink-test";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
+ final String subscriptionName = "test-sub";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
+
+ // create a producer that creates a topic at broker
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+
+ String jarFilePathUrl = Utils.FILE + ":" +
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+ SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion,
functionName, sourceTopic, subscriptionName);
+ admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
+
+ admin.sink().updateSinkWithUrl(sinkConfig, jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ return
admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ // validate pulsar sink consumer has started on the topic
+
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+ // validate prometheus metrics empty
+ String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+ log.info("prometheus metrics: {}", prometheusMetrics);
+
+ Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
+ Metric m = metrics.get("pulsar_sink_received_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_received_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_written_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_written_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_sink_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_system_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_system_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_last_invocation");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+
+ int totalMsgs = 10;
+ for (int i = 0; i < totalMsgs; i++) {
+ String data = "my-message-" + i;
+ producer.newMessage().property(propertyKey,
propertyValue).value(data).send();
+ }
+ retryStrategically((test) -> {
+ try {
+ SubscriptionStats subStats =
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+ return subStats.unackedMessages == 0 &&
subStats.msgThroughputOut == totalMsgs;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 200);
+
+ // get stats after producing
+ prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+ log.info("prometheusMetrics: {}", prometheusMetrics);
+
+ metrics = parseMetrics(prometheusMetrics);
+ m = metrics.get("pulsar_sink_received_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, (double) totalMsgs);
+ m = metrics.get("pulsar_sink_received_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, (double) totalMsgs);
+ m = metrics.get("pulsar_sink_written_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, (double) totalMsgs);
+ m = metrics.get("pulsar_sink_written_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, (double) totalMsgs);
+ m = metrics.get("pulsar_sink_sink_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_system_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_system_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_last_invocation");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertTrue(m.value > 0.0);
+ }
+
+ @Test(timeOut = 20000)
+ public void testPulsarSourceStats() throws Exception {
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String functionName = "PulsarSource-test";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
+
+ String jarFilePathUrl = Utils.FILE + ":" +
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+ SourceConfig sourceConfig = createSourceConfig(tenant,
namespacePortion, functionName, sinkTopic);
+ admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
+
+ admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ return (admin.topics().getStats(sinkTopic).publishers.size()
== 1) && (admin.topics().getInternalStats(sinkTopic).numberOfEntries > 4);
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 10, 150);
+ assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);
+
+ String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+ log.info("prometheusMetrics: {}", prometheusMetrics);
+
+ Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
+ Metric m = metrics.get("pulsar_source_received_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertTrue(m.value > 0.0);
+ m = metrics.get("pulsar_source_received_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertTrue(m.value > 0.0);
+ m = metrics.get("pulsar_source_written_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertTrue(m.value > 0.0);
+ m = metrics.get("pulsar_source_written_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertTrue(m.value > 0.0);
+ m = metrics.get("pulsar_source_source_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_source_source_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_source_system_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_source_system_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_source_last_invocation");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertTrue(m.value > 0.0);
+ }
+
@Test(timeOut = 20000)
public void testPulsarFunctionStats() throws Exception {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 69fd6c865d..460ba75ac7 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -34,9 +34,14 @@
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
+import org.apache.pulsar.functions.instance.stats.SinkStatsManager;
+import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
@@ -52,7 +57,7 @@
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState;
-import static
org.apache.pulsar.functions.instance.FunctionStatsManager.USER_METRIC_PREFIX;
+import static
org.apache.pulsar.functions.instance.stats.FunctionStatsManager.USER_METRIC_PREFIX;
/**
* This class implements the Context interface exposed to the user.
@@ -87,12 +92,13 @@
private final static String[] userMetricsLabelNames;
static {
// add label to indicate user metric
- userMetricsLabelNames =
Arrays.copyOf(FunctionStatsManager.metricsLabelNames,
FunctionStatsManager.metricsLabelNames.length + 1);
- userMetricsLabelNames[FunctionStatsManager.metricsLabelNames.length] =
"metric";
+ userMetricsLabelNames =
Arrays.copyOf(ComponentStatsManager.metricsLabelNames,
ComponentStatsManager.metricsLabelNames.length + 1);
+ userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length]
= "metric";
}
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient
client, List<String> inputTopics,
- SecretsProvider secretsProvider, CollectorRegistry
collectorRegistry, String[] metricsLabels) {
+ SecretsProvider secretsProvider, CollectorRegistry
collectorRegistry, String[] metricsLabels,
+ Utils.ComponentType componentType) {
this.config = config;
this.logger = logger;
this.publishProducers = new HashMap<>();
@@ -119,9 +125,23 @@ public ContextImpl(InstanceConfig config, Logger logger,
PulsarClient client, Li
}
this.metricsLabels = metricsLabels;
+ String prefix;
+ switch (componentType) {
+ case FUNCTION:
+ prefix = FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX;
+ break;
+ case SINK:
+ prefix = SinkStatsManager.PULSAR_SINK_METRICS_PREFIX;
+ break;
+ case SOURCE:
+ prefix = SourceStatsManager.PULSAR_SOURCE_METRICS_PREFIX;
+ break;
+ default:
+ throw new RuntimeException("Unknown component type: " +
componentType);
+ }
this.userMetricsSummary = Summary.build()
- .name("pulsar_function_user_metric")
- .help("Pulsar Function user defined metric.")
+ .name(prefix + ComponentStatsManager.USER_METRIC_PREFIX)
+ .help("User defined metric.")
.labelNames(userMetricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 86a9aa211b..9e3273685f 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -19,15 +19,22 @@
package org.apache.pulsar.functions.instance;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
import lombok.experimental.UtilityClass;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.Reflections;
import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.functions.utils.Utils;
@UtilityClass
public class InstanceUtils {
@@ -77,4 +84,23 @@
return Reflections.createInstance(className, baseClass, clsLoader);
}
}
+
+ public Utils.ComponentType calculateSubjectType(Function.FunctionDetails
functionDetails) {
+ Function.SourceSpec sourceSpec = functionDetails.getSource();
+ Function.SinkSpec sinkSpec = functionDetails.getSink();
+ if (sourceSpec.getInputSpecsCount() == 0) {
+ return SOURCE;
+ }
+ // Now its between sink and function
+
+ if (!isEmpty(sinkSpec.getBuiltin())) {
+ // if its built in, its a sink
+ return SINK;
+ }
+
+ if (isEmpty(sinkSpec.getClassName()) ||
sinkSpec.getClassName().equals(PulsarSink.class.getName())) {
+ return FUNCTION;
+ }
+ return SINK;
+ }
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e858e907f5..7c36b584d0 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -55,6 +55,8 @@
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -116,7 +118,7 @@
// function stats
@Getter
- private FunctionStatsManager stats;
+ private ComponentStatsManager stats;
private Record<?> currentRecord;
@@ -202,7 +204,8 @@ ContextImpl setupContext() {
}
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
- return new ContextImpl(instanceConfig, instanceLog, client,
inputTopics, secretsProvider, collectorRegistry, metricsLabels);
+ return new ContextImpl(instanceConfig, instanceLog, client,
inputTopics, secretsProvider,
+ collectorRegistry, metricsLabels,
InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails()));
}
/**
@@ -216,7 +219,9 @@ public void run() {
if (this.collectorRegistry == null) {
this.collectorRegistry = new CollectorRegistry();
}
- this.stats = new FunctionStatsManager(this.collectorRegistry,
this.metricsLabels, this.instanceCache.getScheduledExecutorService());
+ this.stats =
ComponentStatsManager.getStatsManager(this.collectorRegistry,
this.metricsLabels,
+ this.instanceCache.getScheduledExecutorService(),
+
InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails()));
ContextImpl contextImpl = setupContext();
javaInstance = setupJavaInstance(contextImpl);
@@ -284,6 +289,7 @@ public void run() {
private void loadJars() throws Exception {
try {
+ log.info("jarFile: {}", jarFile);
// Let's first try to treat it as a nar archive
fnCache.registerFunctionInstanceWithArchive(
instanceConfig.getFunctionId(),
@@ -546,6 +552,12 @@ private Builder createMetricsDataBuilder() {
stats.getLatestSystemExceptions().forEach(ex -> {
functionStatusBuilder.addLatestSystemExceptions(ex);
});
+ stats.getLatestSourceExceptions().forEach(ex -> {
+ functionStatusBuilder.addLatestSourceExceptions(ex);
+ });
+ stats.getLatestSinkExceptions().forEach(ex -> {
+ functionStatusBuilder.addLatestSinkExceptions(ex);
+ });
functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency());
functionStatusBuilder.setLastInvocationTime((long)
stats.getLastInvocation());
return functionStatusBuilder;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
new file mode 100644
index 0000000000..3cb654b4a0
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.stats;
+
+import com.google.common.collect.EvictingQueue;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.Utils;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public abstract class ComponentStatsManager implements AutoCloseable {
+
+ protected String[] metricsLabels;
+
+ protected ScheduledFuture<?> scheduledFuture;
+
+ protected final CollectorRegistry collectorRegistry;
+
+ protected final EvictingQueue EMPTY_QUEUE = EvictingQueue.create(0);
+
+ public final static String USER_METRIC_PREFIX = "user_metric_";
+
+ public static final String[] metricsLabelNames = {"tenant", "namespace",
"function", "instance_id", "cluster", "fqfn"};
+
+ protected static final String[] exceptionMetricsLabelNames;
+
+ static {
+ exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames,
metricsLabelNames.length + 2);
+ exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
+ exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts";
+ }
+
+ public static ComponentStatsManager getStatsManager(CollectorRegistry
collectorRegistry,
+ String[] metricsLabels,
+ ScheduledExecutorService
scheduledExecutorService,
+ Utils.ComponentType componentType) {
+ switch (componentType) {
+ case FUNCTION:
+ return new FunctionStatsManager(collectorRegistry,
metricsLabels, scheduledExecutorService);
+ case SOURCE:
+ return new SourceStatsManager(collectorRegistry,
metricsLabels, scheduledExecutorService);
+ case SINK:
+ return new SinkStatsManager(collectorRegistry, metricsLabels,
scheduledExecutorService);
+ default:
+ throw new RuntimeException("Unknown component type: " +
componentType);
+ }
+ }
+
+ public ComponentStatsManager(CollectorRegistry collectorRegistry,
+ String[] metricsLabels,
+ ScheduledExecutorService scheduledExecutorService) {
+
+ this.collectorRegistry = collectorRegistry;
+ this.metricsLabels = metricsLabels;
+
+ scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new
Runnable() {
+ @Override
+ public void run() {
+ try {
+ reset();
+ } catch (Exception e) {
+ log.error("Failed to reset metrics for 1min window", e);
+ }
+ }
+ }, 1, 1, TimeUnit.MINUTES);
+ }
+
+ public abstract void reset();
+
+ public abstract void incrTotalReceived();
+
+ public abstract void incrTotalProcessedSuccessfully();
+
+ public abstract void incrSysExceptions(Throwable sysException);
+
+ public abstract void incrUserExceptions(Exception userException);
+
+ public abstract void incrSourceExceptions(Exception userException);
+
+ public abstract void incrSinkExceptions(Exception userException);
+
+ public abstract void setLastInvocation(long ts);
+
+ public abstract void processTimeStart();
+
+ public abstract void processTimeEnd();
+
+ public abstract double getTotalProcessedSuccessfully();
+
+ public abstract double getTotalRecordsReceived();
+
+ public abstract double getTotalSysExceptions();
+
+ public abstract double getTotalUserExceptions();
+
+ public abstract double getLastInvocation();
+
+ public abstract double getAvgProcessLatency();
+
+ public abstract double getTotalProcessedSuccessfully1min();
+
+ public abstract double getTotalRecordsReceived1min();
+
+ public abstract double getTotalSysExceptions1min();
+
+ public abstract double getTotalUserExceptions1min();
+
+ public abstract double getAvgProcessLatency1min();
+
+ public abstract
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestUserExceptions();
+
+ public abstract
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSystemExceptions();
+
+ public abstract
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSourceExceptions();
+
+ public abstract
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSinkExceptions();
+
+ public String getStatsAsString() throws IOException {
+ StringWriter outputWriter = new StringWriter();
+
+ TextFormat.write004(outputWriter,
collectorRegistry.metricFamilySamples());
+
+ return outputWriter.toString();
+ }
+
+ protected InstanceCommunication.FunctionStatus.ExceptionInformation
getExceptionInfo(Throwable th, long ts) {
+ InstanceCommunication.FunctionStatus.ExceptionInformation.Builder
exceptionInfoBuilder =
+
InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder().setMsSinceEpoch(ts);
+ String msg = th.getMessage();
+ if (msg != null) {
+ exceptionInfoBuilder.setExceptionString(msg);
+ }
+ return exceptionInfoBuilder.build();
+ }
+
+
+ @Override
+ public void close() {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ scheduledFuture = null;
+ }
+ }
+}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
similarity index 64%
rename from
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
rename to
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index 818fdb3fe9..d03bfe2ab9 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -16,25 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.instance;
+package org.apache.pulsar.functions.instance.stats;
import com.google.common.collect.EvictingQueue;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
-import io.prometheus.client.exporter.common.TextFormat;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.RateLimiter;
import org.apache.pulsar.functions.proto.InstanceCommunication;
-import java.io.IOException;
-import java.io.StringWriter;
import java.util.Arrays;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@@ -43,18 +39,9 @@
@Slf4j
@Getter
@Setter
-public class FunctionStatsManager implements AutoCloseable {
-
- static final String[] metricsLabelNames = {"tenant", "namespace",
"function", "instance_id", "cluster", "fqfn"};
- static final String[] exceptionMetricsLabelNames;
- static {
- exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames,
metricsLabelNames.length + 2);
- exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
- exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts";
- }
+public class FunctionStatsManager extends ComponentStatsManager{
public static final String PULSAR_FUNCTION_METRICS_PREFIX =
"pulsar_function_";
- public final static String USER_METRIC_PREFIX = "user_metric_";
/** Declare metric names **/
public static final String PROCESSED_SUCCESSFULLY_TOTAL =
"processed_successfully_total";
@@ -81,16 +68,12 @@
final Counter statTotalSysExceptions;
final Counter statTotalUserExceptions;
-
- final Counter statTotalSourceExceptions;
-
- final Counter statTotalSinkExceptions;
-
+
final Summary statProcessLatency;
final Gauge statlastInvocation;
- final Counter statTotalRecordsRecieved;
+ final Counter statTotalRecordsReceived;
// windowed metrics
@@ -99,14 +82,10 @@
final Counter statTotalSysExceptions1min;
final Counter statTotalUserExceptions1min;
-
- final Counter statTotalSourceExceptions1min;
-
- final Counter statTotalSinkExceptions1min;
-
+
final Summary statProcessLatency1min;
- final Counter statTotalRecordsRecieved1min;
+ final Counter statTotalRecordsReceived1min;
// exceptions
@@ -122,47 +101,28 @@
private final Counter.Child _statTotalProcessedSuccessfully;
private final Counter.Child _statTotalSysExceptions;
private final Counter.Child _statTotalUserExceptions;
- private final Counter.Child _statTotalSourceExceptions;
- private final Counter.Child _statTotalSinkExceptions;
private final Summary.Child _statProcessLatency;
private final Gauge.Child _statlastInvocation;
- private final Counter.Child _statTotalRecordsRecieved;
+ private final Counter.Child _statTotalRecordsReceived;
private Counter.Child _statTotalProcessedSuccessfully1min;
private Counter.Child _statTotalSysExceptions1min;
private Counter.Child _statTotalUserExceptions1min;
- private Counter.Child _statTotalSourceExceptions1min;
- private Counter.Child _statTotalSinkExceptions1min;
private Summary.Child _statProcessLatency1min;
- private Counter.Child _statTotalRecordsRecieved1min;
-
- private String[] metricsLabels;
-
- private ScheduledFuture<?> scheduledFuture;
-
- private final CollectorRegistry collectorRegistry;
+ private Counter.Child _statTotalRecordsReceived1min;
@Getter
private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestUserExceptions = EvictingQueue.create(10);
@Getter
private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSystemExceptions = EvictingQueue.create(10);
- @Getter
- private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSourceExceptions = EvictingQueue.create(10);
- @Getter
- private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSinkExceptions = EvictingQueue.create(10);
private final RateLimiter userExceptionRateLimiter;
private final RateLimiter sysExceptionRateLimiter;
- private final RateLimiter sourceExceptionRateLimiter;
-
- private final RateLimiter sinkExceptionRateLimiter;
-
- public FunctionStatsManager(CollectorRegistry collectorRegistry, String[]
metricsLabels, ScheduledExecutorService scheduledExecutorService) {
-
- this.collectorRegistry = collectorRegistry;
-
- this.metricsLabels = metricsLabels;
+ public FunctionStatsManager(CollectorRegistry collectorRegistry,
+ String[] metricsLabels,
+ ScheduledExecutorService
scheduledExecutorService) {
+ super(collectorRegistry, metricsLabels, scheduledExecutorService);
statTotalProcessedSuccessfully = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX +
PROCESSED_SUCCESSFULLY_TOTAL)
@@ -185,20 +145,6 @@ public FunctionStatsManager(CollectorRegistry
collectorRegistry, String[] metric
.register(collectorRegistry);
_statTotalUserExceptions =
statTotalUserExceptions.labels(metricsLabels);
- statTotalSourceExceptions = Counter.build()
- .name(PULSAR_FUNCTION_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL)
- .help("Total number of source exceptions.")
- .labelNames(metricsLabelNames)
- .register(collectorRegistry);
- _statTotalSourceExceptions =
statTotalSourceExceptions.labels(metricsLabels);
-
- statTotalSinkExceptions = Counter.build()
- .name(PULSAR_FUNCTION_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL)
- .help("Total number of sink exceptions.")
- .labelNames(metricsLabelNames)
- .register(collectorRegistry);
- _statTotalSinkExceptions =
statTotalSinkExceptions.labels(metricsLabels);
-
statProcessLatency = Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
.help("Process latency in milliseconds.")
@@ -217,12 +163,12 @@ public FunctionStatsManager(CollectorRegistry
collectorRegistry, String[] metric
.register(collectorRegistry);
_statlastInvocation = statlastInvocation.labels(metricsLabels);
- statTotalRecordsRecieved = Counter.build()
+ statTotalRecordsReceived = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL)
.help("Total number of messages received from source.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
- _statTotalRecordsRecieved =
statTotalRecordsRecieved.labels(metricsLabels);
+ _statTotalRecordsReceived =
statTotalRecordsReceived.labels(metricsLabels);
statTotalProcessedSuccessfully1min = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX +
PROCESSED_SUCCESSFULLY_TOTAL_1min)
@@ -245,20 +191,6 @@ public FunctionStatsManager(CollectorRegistry
collectorRegistry, String[] metric
.register(collectorRegistry);
_statTotalUserExceptions1min =
statTotalUserExceptions1min.labels(metricsLabels);
- statTotalSourceExceptions1min = Counter.build()
- .name(PULSAR_FUNCTION_METRICS_PREFIX +
SOURCE_EXCEPTIONS_TOTAL_1min)
- .help("Total number of source exceptions in the last 1
minute.")
- .labelNames(metricsLabelNames)
- .register(collectorRegistry);
- _statTotalSourceExceptions1min =
statTotalSourceExceptions1min.labels(metricsLabels);
-
- statTotalSinkExceptions1min = Counter.build()
- .name(PULSAR_FUNCTION_METRICS_PREFIX +
SINK_EXCEPTIONS_TOTAL_1min)
- .help("Total number of sink exceptions in the last 1 minute.")
- .labelNames(metricsLabelNames)
- .register(collectorRegistry);
- _statTotalSinkExceptions1min =
statTotalSinkExceptions1min.labels(metricsLabels);
-
statProcessLatency1min = Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
.help("Process latency in milliseconds in the last 1 minute.")
@@ -270,12 +202,12 @@ public FunctionStatsManager(CollectorRegistry
collectorRegistry, String[] metric
.register(collectorRegistry);
_statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);
- statTotalRecordsRecieved1min = Counter.build()
+ statTotalRecordsReceived1min = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min)
.help("Total number of messages received from source in the
last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
- _statTotalRecordsRecieved1min =
statTotalRecordsRecieved1min.labels(metricsLabels);
+ _statTotalRecordsReceived1min =
statTotalRecordsReceived1min.labels(metricsLabels);
userExceptions = Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception")
@@ -300,22 +232,8 @@ public FunctionStatsManager(CollectorRegistry
collectorRegistry, String[] metric
.help("Exception from sink.")
.register(collectorRegistry);
- scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new
Runnable() {
- @Override
- public void run() {
- try {
- reset();
- } catch (Exception e) {
- log.error("Failed to reset metrics for 1min window", e);
- }
- }
- }, 1, 1, TimeUnit.MINUTES);
-
userExceptionRateLimiter = new RateLimiter(scheduledExecutorService,
5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5,
1, TimeUnit.MINUTES);
- sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService,
5, 1, TimeUnit.MINUTES);
- sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService,
5, 1, TimeUnit.MINUTES);
-
}
public void addUserException(Exception ex) {
@@ -346,87 +264,54 @@ public void addSystemException(Throwable ex) {
}
}
- public void addSourceException(Throwable ex) {
- long ts = System.currentTimeMillis();
- InstanceCommunication.FunctionStatus.ExceptionInformation info =
getExceptionInfo(ex, ts);
- latestSourceExceptions.add(info);
-
- // report exception throw prometheus
- if (sourceExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 2);
- exceptionMetricsLabels[exceptionMetricsLabels.length - 2] =
ex.getMessage() != null ? ex.getMessage() : "";
- exceptionMetricsLabels[exceptionMetricsLabels.length - 1] =
String.valueOf(ts);
- sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
- }
- }
-
- public void addSinkException(Throwable ex) {
- long ts = System.currentTimeMillis();
- InstanceCommunication.FunctionStatus.ExceptionInformation info =
getExceptionInfo(ex, ts);
- latestSinkExceptions.add(info);
-
- // report exception throw prometheus
- if (sinkExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 2);
- exceptionMetricsLabels[exceptionMetricsLabels.length - 2] =
ex.getMessage() != null ? ex.getMessage() : "";
- exceptionMetricsLabels[exceptionMetricsLabels.length - 1] =
String.valueOf(ts);
- sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
- }
- }
-
- private InstanceCommunication.FunctionStatus.ExceptionInformation
getExceptionInfo(Throwable th, long ts) {
- InstanceCommunication.FunctionStatus.ExceptionInformation.Builder
exceptionInfoBuilder =
-
InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder().setMsSinceEpoch(ts);
- String msg = th.getMessage();
- if (msg != null) {
- exceptionInfoBuilder.setExceptionString(msg);
- }
- return exceptionInfoBuilder.build();
- }
-
+ @Override
public void incrTotalReceived() {
- _statTotalRecordsRecieved.inc();
- _statTotalRecordsRecieved1min.inc();
+ _statTotalRecordsReceived.inc();
+ _statTotalRecordsReceived1min.inc();
}
+ @Override
public void incrTotalProcessedSuccessfully() {
_statTotalProcessedSuccessfully.inc();
_statTotalProcessedSuccessfully1min.inc();
}
+ @Override
public void incrSysExceptions(Throwable sysException) {
_statTotalSysExceptions.inc();
_statTotalSysExceptions1min.inc();
addSystemException(sysException);
}
+ @Override
public void incrUserExceptions(Exception userException) {
_statTotalUserExceptions.inc();
_statTotalUserExceptions1min.inc();
addUserException(userException);
}
- public void incrSourceExceptions(Exception userException) {
- _statTotalSourceExceptions.inc();
- _statTotalSourceExceptions1min.inc();
- addSourceException(userException);
+ @Override
+ public void incrSourceExceptions(Exception ex) {
+ incrSysExceptions(ex);
}
- public void incrSinkExceptions(Exception userException) {
- _statTotalSinkExceptions.inc();
- _statTotalSinkExceptions1min.inc();
- addSinkException(userException);
+ @Override
+ public void incrSinkExceptions(Exception ex) {
+ incrSysExceptions(ex);
}
+ @Override
public void setLastInvocation(long ts) {
_statlastInvocation.set(ts);
}
private Long processTimeStart;
+ @Override
public void processTimeStart() {
processTimeStart = System.nanoTime();
}
+ @Override
public void processTimeEnd() {
if (processTimeStart != null) {
double endTimeMs = ((double) System.nanoTime() - processTimeStart)
/ 1.0E6D;
@@ -435,30 +320,27 @@ public void processTimeEnd() {
}
}
+ @Override
public double getTotalProcessedSuccessfully() {
return _statTotalProcessedSuccessfully.get();
}
+ @Override
public double getTotalRecordsReceived() {
- return _statTotalRecordsRecieved.get();
+ return _statTotalRecordsReceived.get();
}
+ @Override
public double getTotalSysExceptions() {
return _statTotalSysExceptions.get();
}
+ @Override
public double getTotalUserExceptions() {
return _statTotalUserExceptions.get();
}
-
- public double getTotalSourceExceptions() {
- return _statTotalSourceExceptions.get();
- }
-
- public double getTotalSinkExceptions() {
- return _statTotalSinkExceptions.get();
- }
-
+
+ @Override
public double getLastInvocation() {
return _statlastInvocation.get();
}
@@ -484,33 +366,40 @@ public double getProcessLatency99_9P() {
return _statProcessLatency.get().quantiles.get(0.999);
}
+ @Override
public double getTotalProcessedSuccessfully1min() {
return _statTotalProcessedSuccessfully1min.get();
}
+ @Override
public double getTotalRecordsReceived1min() {
- return _statTotalRecordsRecieved1min.get();
+ return _statTotalRecordsReceived1min.get();
}
+ @Override
public double getTotalSysExceptions1min() {
return _statTotalSysExceptions1min.get();
}
+ @Override
public double getTotalUserExceptions1min() {
return _statTotalUserExceptions1min.get();
}
-
- public double getTotalSourceExceptions1min() {
- return _statTotalSourceExceptions1min.get();
+
+ @Override
+ public double getAvgProcessLatency1min() {
+ return _statProcessLatency1min.get().count <= 0.0
+ ? 0 : _statProcessLatency1min.get().sum /
_statProcessLatency1min.get().count;
}
- public double getTotalSinkExceptions1min() {
- return _statTotalSinkExceptions1min.get();
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSourceExceptions() {
+ return EMPTY_QUEUE;
}
- public double getAvgProcessLatency1min() {
- return _statProcessLatency1min.get().count <= 0.0
- ? 0 : _statProcessLatency1min.get().sum /
_statProcessLatency1min.get().count;
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSinkExceptions() {
+ return EMPTY_QUEUE;
}
public double getProcessLatency50P1min() {
@@ -529,6 +418,7 @@ public double getProcessLatency99_9P1min() {
return _statProcessLatency1min.get().quantiles.get(0.999);
}
+ @Override
public void reset() {
statTotalProcessedSuccessfully1min.clear();
_statTotalProcessedSuccessfully1min =
statTotalProcessedSuccessfully1min.labels(metricsLabels);
@@ -539,37 +429,13 @@ public void reset() {
statTotalUserExceptions1min.clear();
_statTotalUserExceptions1min =
statTotalUserExceptions1min.labels(metricsLabels);
- statTotalSourceExceptions1min.clear();
- _statTotalSourceExceptions1min =
statTotalSourceExceptions1min.labels(metricsLabels);
-
- statTotalSinkExceptions1min.clear();
- _statTotalSinkExceptions1min =
statTotalSinkExceptions1min.labels(metricsLabels);
-
statProcessLatency1min.clear();
_statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);
- statTotalRecordsRecieved1min.clear();
- _statTotalRecordsRecieved1min =
statTotalRecordsRecieved1min.labels(metricsLabels);
+ statTotalRecordsReceived1min.clear();
+ _statTotalRecordsReceived1min =
statTotalRecordsReceived1min.labels(metricsLabels);
latestUserExceptions.clear();
latestSystemExceptions.clear();
- latestSourceExceptions.clear();
- latestSinkExceptions.clear();
- }
-
- public String getStatsAsString() throws IOException {
- StringWriter outputWriter = new StringWriter();
-
- TextFormat.write004(outputWriter,
collectorRegistry.metricFamilySamples());
-
- return outputWriter.toString();
- }
-
- @Override
- public void close() {
- if (scheduledFuture != null) {
- scheduledFuture.cancel(false);
- scheduledFuture = null;
- }
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
new file mode 100644
index 0000000000..14e4c21ae7
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.stats;
+
+import com.google.common.collect.EvictingQueue;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import lombok.Getter;
+import org.apache.pulsar.common.util.RateLimiter;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+
+import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class SinkStatsManager extends ComponentStatsManager {
+
+ public static final String PULSAR_SINK_METRICS_PREFIX = "pulsar_sink_";
+
+ /** Declare metric names **/
+ public static final String SYSTEM_EXCEPTIONS_TOTAL =
"system_exceptions_total";
+ public static final String SINK_EXCEPTIONS_TOTAL = "sink_exceptions_total";
+ public static final String LAST_INVOCATION = "last_invocation";
+ public static final String RECEIVED_TOTAL = "received_total";
+ public static final String WRITTEN_TOTAL = "written_total";
+
+ public static final String SYSTEM_EXCEPTIONS_TOTAL_1min =
"system_exceptions_total_1min";
+ public static final String SINK_EXCEPTIONS_TOTAL_1min =
"sink_exceptions_total_1min";
+ public static final String RECEIVED_TOTAL_1min = "received_total_1min";
+ public static final String WRITTEN_TOTAL_1min = "written_total_1min";
+
+ /** Declare Prometheus stats **/
+
+ private final Counter statTotalRecordsReceived;
+
+ private final Counter statTotalSysExceptions;
+
+ private final Counter statTotalSinkExceptions;
+
+ private final Counter statTotalWritten;
+
+ private final Gauge statlastInvocation;
+
+ // windowed metrics
+ private final Counter statTotalRecordsReceived1min;
+
+ private final Counter statTotalSysExceptions1min;
+
+ private final Counter statTotalSinkExceptions1min;
+
+ private final Counter statTotalWritten1min;
+
+ // exceptions
+
+ final Gauge sysExceptions;
+
+ final Gauge sinkExceptions;
+
+ // As an optimization
+ private final Counter.Child _statTotalRecordsReceived;
+ private final Counter.Child _statTotalSysExceptions;
+ private final Counter.Child _statTotalSinkExceptions;
+ private final Counter.Child _statTotalWritten;
+ private final Gauge.Child _statlastInvocation;
+
+ private Counter.Child _statTotalRecordsReceived1min;
+ private Counter.Child _statTotalSysExceptions1min;
+ private Counter.Child _statTotalSinkExceptions1min;
+ private Counter.Child _statTotalWritten1min;
+
+ @Getter
+ private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSystemExceptions = EvictingQueue.create(10);
+ @Getter
+ private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSinkExceptions = EvictingQueue.create(10);
+
+ private final RateLimiter sysExceptionRateLimiter;
+
+ private final RateLimiter sinkExceptionRateLimiter;
+
+
+ public SinkStatsManager(CollectorRegistry collectorRegistry, String[]
metricsLabels, ScheduledExecutorService
+ scheduledExecutorService) {
+ super(collectorRegistry, metricsLabels, scheduledExecutorService);
+
+ statTotalRecordsReceived = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL)
+ .help("Total number of records sink has received from Pulsar
topic(s).")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalRecordsReceived =
statTotalRecordsReceived.labels(metricsLabels);
+
+ statTotalSysExceptions = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
+ .help("Total number of system exceptions.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
+
+ statTotalSinkExceptions = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL)
+ .help("Total number of sink exceptions.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSinkExceptions =
statTotalSinkExceptions.labels(metricsLabels);
+
+ statTotalWritten = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL)
+ .help("Total number of records processed by sink.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalWritten = statTotalWritten.labels(metricsLabels);
+
+ statlastInvocation = Gauge.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + LAST_INVOCATION)
+ .help("The timestamp of the last invocation of the sink.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statlastInvocation = statlastInvocation.labels(metricsLabels);
+
+ statTotalRecordsReceived1min = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL_1min)
+ .help("Total number of messages sink has received from Pulsar
topic(s) in the last 1 minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalRecordsReceived1min =
statTotalRecordsReceived1min.labels(metricsLabels);
+
+ statTotalSysExceptions1min = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX +
SYSTEM_EXCEPTIONS_TOTAL_1min)
+ .help("Total number of system exceptions in the last 1
minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSysExceptions1min =
statTotalSysExceptions1min.labels(metricsLabels);
+
+ statTotalSinkExceptions1min = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min)
+ .help("Total number of sink exceptions in the last 1 minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSinkExceptions1min =
statTotalSinkExceptions1min.labels(metricsLabels);
+
+ statTotalWritten1min = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL_1min)
+ .help("Total number of records processed by sink the last 1
minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
+
+ sysExceptions = Gauge.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + "system_exception")
+ .labelNames(exceptionMetricsLabelNames)
+ .help("Exception from system code.")
+ .register(collectorRegistry);
+
+ sinkExceptions = Gauge.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + "sink_exception")
+ .labelNames(exceptionMetricsLabelNames)
+ .help("Exception from sink.")
+ .register(collectorRegistry);
+
+ sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5,
1, TimeUnit.MINUTES);
+ sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService,
5, 1, TimeUnit.MINUTES);
+ }
+
+ @Override
+ public void reset() {
+ statTotalRecordsReceived1min.clear();
+ _statTotalRecordsReceived1min =
statTotalRecordsReceived1min.labels(metricsLabels);
+
+ statTotalSysExceptions1min.clear();
+ _statTotalSysExceptions1min =
statTotalSysExceptions1min.labels(metricsLabels);
+
+ statTotalSinkExceptions1min.clear();
+ _statTotalSinkExceptions1min =
statTotalSinkExceptions1min.labels(metricsLabels);
+
+ statTotalWritten1min.clear();
+ _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
+
+ latestSystemExceptions.clear();
+ latestSinkExceptions.clear();
+ }
+
+ @Override
+ public void incrTotalReceived() {
+ _statTotalRecordsReceived.inc();
+ _statTotalRecordsReceived1min.inc();
+ }
+
+ @Override
+ public void incrTotalProcessedSuccessfully() {
+ _statTotalWritten.inc();
+ _statTotalWritten1min.inc();
+ }
+
+ @Override
+ public void incrSysExceptions(Throwable ex) {
+ long ts = System.currentTimeMillis();
+ InstanceCommunication.FunctionStatus.ExceptionInformation info =
getExceptionInfo(ex, ts);
+ latestSystemExceptions.add(info);
+
+ // report exception throw prometheus
+ if (sysExceptionRateLimiter.tryAcquire()) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 2);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 2] =
ex.getMessage() != null ? ex.getMessage() : "";
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] =
String.valueOf(ts);
+ sysExceptions.labels(exceptionMetricsLabels).set(1.0);
+ }
+ }
+
+ @Override
+ public void incrUserExceptions(Exception ex) {
+ incrSysExceptions(ex);
+ }
+
+ @Override
+ public void incrSourceExceptions(Exception ex) {
+ incrSysExceptions(ex);
+ }
+
+ @Override
+ public void incrSinkExceptions(Exception ex) {
+ long ts = System.currentTimeMillis();
+ InstanceCommunication.FunctionStatus.ExceptionInformation info =
getExceptionInfo(ex, ts);
+ latestSinkExceptions.add(info);
+
+ // report exception throw prometheus
+ if (sinkExceptionRateLimiter.tryAcquire()) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 2);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 2] =
ex.getMessage() != null ? ex.getMessage() : "";
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] =
String.valueOf(ts);
+ sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
+ }
+ }
+
+ @Override
+ public void setLastInvocation(long ts) {
+ _statlastInvocation.set(ts);
+ }
+
+ @Override
+ public void processTimeStart() {
+ //no-p[
+ }
+
+ @Override
+ public void processTimeEnd() {
+ //no-op
+ }
+
+ @Override
+ public double getTotalProcessedSuccessfully() {
+ return _statTotalWritten.get();
+ }
+
+ @Override
+ public double getTotalRecordsReceived() {
+ return _statTotalRecordsReceived.get();
+ }
+
+ @Override
+ public double getTotalSysExceptions() {
+ return _statTotalSysExceptions.get();
+ }
+
+ @Override
+ public double getTotalUserExceptions() {
+ return 0;
+ }
+
+ @Override
+ public double getLastInvocation() {
+ return _statlastInvocation.get();
+ }
+
+ @Override
+ public double getAvgProcessLatency() {
+ return 0;
+ }
+
+ @Override
+ public double getTotalProcessedSuccessfully1min() {
+ return _statTotalWritten1min.get();
+ }
+
+ @Override
+ public double getTotalRecordsReceived1min() {
+ return _statTotalRecordsReceived1min.get();
+ }
+
+ @Override
+ public double getTotalSysExceptions1min() {
+ return _statTotalSysExceptions1min.get();
+ }
+
+ @Override
+ public double getTotalUserExceptions1min() {
+ return 0;
+ }
+
+ @Override
+ public double getAvgProcessLatency1min() {
+ return 0;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestUserExceptions() {
+ return EMPTY_QUEUE;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSystemExceptions() {
+ return latestSystemExceptions;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSourceExceptions() {
+ return EMPTY_QUEUE;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSinkExceptions() {
+ return latestSinkExceptions;
+ }
+}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
new file mode 100644
index 0000000000..5679f2e1e4
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.stats;
+
+import com.google.common.collect.EvictingQueue;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import lombok.Getter;
+import org.apache.pulsar.common.util.RateLimiter;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+
+import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class SourceStatsManager extends ComponentStatsManager {
+
+ public static final String PULSAR_SOURCE_METRICS_PREFIX = "pulsar_source_";
+
+ /** Declare metric names **/
+ public static final String SYSTEM_EXCEPTIONS_TOTAL =
"system_exceptions_total";
+ public static final String SOURCE_EXCEPTIONS_TOTAL =
"source_exceptions_total";
+ public static final String LAST_INVOCATION = "last_invocation";
+ public static final String RECEIVED_TOTAL = "received_total";
+ public static final String WRITTEN_TOTAL = "written_total";
+
+ public static final String SYSTEM_EXCEPTIONS_TOTAL_1min =
"system_exceptions_total_1min";
+ public static final String SOURCE_EXCEPTIONS_TOTAL_1min =
"source_exceptions_total_1min";
+ public static final String RECEIVED_TOTAL_1min = "received_total_1min";
+ public static final String WRITTEN_TOTAL_1min = "written_total_1min";
+
+ /** Declare Prometheus stats **/
+
+ private final Counter statTotalRecordsReceived;
+
+ private final Counter statTotalSysExceptions;
+
+ private final Counter statTotalSourceExceptions;
+
+ private final Counter statTotalWritten;
+
+ private final Gauge statlastInvocation;
+
+ // windowed metrics
+ private final Counter statTotalRecordsReceived1min;
+
+ private final Counter statTotalSysExceptions1min;
+
+ private final Counter statTotalSourceExceptions1min;
+
+ private final Counter statTotalWritten1min;
+
+ // exceptions
+
+ final Gauge sysExceptions;
+
+ final Gauge sourceExceptions;
+
+ // As an optimization
+ private final Counter.Child _statTotalRecordsReceived;
+ private final Counter.Child _statTotalSysExceptions;
+ private final Counter.Child _statTotalSourceExceptions;
+ private final Counter.Child _statTotalWritten;
+ private final Gauge.Child _statlastInvocation;
+
+ private Counter.Child _statTotalRecordsReceived1min;
+ private Counter.Child _statTotalSysExceptions1min;
+ private Counter.Child _statTotalSourceExceptions1min;
+ private Counter.Child _statTotalWritten1min;
+
+ @Getter
+ private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSystemExceptions = EvictingQueue.create(10);
+ @Getter
+ private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSourceExceptions = EvictingQueue.create(10);
+
+ protected final RateLimiter sysExceptionRateLimiter;
+
+ protected final RateLimiter sourceExceptionRateLimiter;
+
+ public SourceStatsManager(CollectorRegistry collectorRegistry, String[]
metricsLabels, ScheduledExecutorService
+ scheduledExecutorService) {
+ super(collectorRegistry, metricsLabels, scheduledExecutorService);
+
+ statTotalRecordsReceived = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL)
+ .help("Total number of records received from source.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalRecordsReceived =
statTotalRecordsReceived.labels(metricsLabels);
+
+ statTotalSysExceptions = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
+ .help("Total number of system exceptions.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
+
+ statTotalSourceExceptions = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL)
+ .help("Total number of source exceptions.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSourceExceptions =
statTotalSourceExceptions.labels(metricsLabels);
+
+ statTotalWritten = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL)
+ .help("Total number of records written to a Pulsar topic.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalWritten = statTotalWritten.labels(metricsLabels);
+
+ statlastInvocation = Gauge.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + LAST_INVOCATION)
+ .help("The timestamp of the last invocation of the source.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statlastInvocation = statlastInvocation.labels(metricsLabels);
+
+ statTotalRecordsReceived1min = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL_1min)
+ .help("Total number of records received from source in the
last 1 minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalRecordsReceived1min =
statTotalRecordsReceived1min.labels(metricsLabels);
+
+ statTotalSysExceptions1min = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX +
SYSTEM_EXCEPTIONS_TOTAL_1min)
+ .help("Total number of system exceptions in the last 1
minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSysExceptions1min =
statTotalSysExceptions1min.labels(metricsLabels);
+
+ statTotalSourceExceptions1min = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX +
SOURCE_EXCEPTIONS_TOTAL_1min)
+ .help("Total number of source exceptions in the last 1
minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSourceExceptions1min =
statTotalSourceExceptions1min.labels(metricsLabels);
+
+ statTotalWritten1min = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL_1min)
+ .help("Total number of records written to a Pulsar topic in
the last 1 minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
+
+ sysExceptions = Gauge.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + "system_exception")
+ .labelNames(exceptionMetricsLabelNames)
+ .help("Exception from system code.")
+ .register(collectorRegistry);
+
+ sourceExceptions = Gauge.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + "source_exception")
+ .labelNames(exceptionMetricsLabelNames)
+ .help("Exception from source.")
+ .register(collectorRegistry);
+
+ sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5,
1, TimeUnit.MINUTES);
+ sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService,
5, 1, TimeUnit.MINUTES);
+ }
+
+ @Override
+ public void reset() {
+ statTotalRecordsReceived1min.clear();
+ _statTotalRecordsReceived1min =
statTotalRecordsReceived1min.labels(metricsLabels);
+
+ statTotalSysExceptions1min.clear();
+ _statTotalSysExceptions1min =
statTotalSysExceptions1min.labels(metricsLabels);
+
+ statTotalSourceExceptions1min.clear();
+ _statTotalSourceExceptions1min =
statTotalSourceExceptions1min.labels(metricsLabels);
+
+ statTotalWritten1min.clear();
+ _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
+
+ latestSystemExceptions.clear();
+ latestSourceExceptions.clear();
+ }
+
+ @Override
+ public void incrTotalReceived() {
+ _statTotalRecordsReceived.inc();
+ _statTotalRecordsReceived1min.inc();
+ }
+
+ @Override
+ public void incrTotalProcessedSuccessfully() {
+ _statTotalWritten.inc();
+ _statTotalWritten1min.inc();
+ }
+
+ @Override
+ public void incrSysExceptions(Throwable ex) {
+ long ts = System.currentTimeMillis();
+ InstanceCommunication.FunctionStatus.ExceptionInformation info =
getExceptionInfo(ex, ts);
+ latestSystemExceptions.add(info);
+
+ // report exception throw prometheus
+ if (sysExceptionRateLimiter.tryAcquire()) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 2);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 2] =
ex.getMessage() != null ? ex.getMessage() : "";
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] =
String.valueOf(ts);
+ sysExceptions.labels(exceptionMetricsLabels).set(1.0);
+ }
+ }
+
+ @Override
+ public void incrUserExceptions(Exception ex) {
+ incrSysExceptions(ex);
+ }
+
+ @Override
+ public void incrSourceExceptions(Exception ex) {
+ long ts = System.currentTimeMillis();
+ InstanceCommunication.FunctionStatus.ExceptionInformation info =
getExceptionInfo(ex, ts);
+ latestSourceExceptions.add(info);
+
+ // report exception throw prometheus
+ if (sourceExceptionRateLimiter.tryAcquire()) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 2);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 2] =
ex.getMessage() != null ? ex.getMessage() : "";
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] =
String.valueOf(ts);
+ sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
+ }
+ }
+
+ @Override
+ public void incrSinkExceptions(Exception ex) {
+ incrSysExceptions(ex);
+ }
+
+ @Override
+ public void setLastInvocation(long ts) {
+ _statlastInvocation.set(ts);
+ }
+
+ @Override
+ public void processTimeStart() {
+ //no-op
+ }
+
+ @Override
+ public void processTimeEnd() {
+ //no-op
+ }
+
+ @Override
+ public double getTotalProcessedSuccessfully() {
+ return _statTotalWritten.get();
+ }
+
+ @Override
+ public double getTotalRecordsReceived() {
+ return _statTotalRecordsReceived.get();
+ }
+
+ @Override
+ public double getTotalSysExceptions() {
+ return _statTotalSysExceptions.get();
+ }
+
+ @Override
+ public double getTotalUserExceptions() {
+ return 0;
+ }
+
+ @Override
+ public double getLastInvocation() {
+ return _statlastInvocation.get();
+ }
+
+ @Override
+ public double getAvgProcessLatency() {
+ return 0;
+ }
+
+ @Override
+ public double getTotalProcessedSuccessfully1min() {
+ return _statTotalWritten1min.get();
+ }
+
+ @Override
+ public double getTotalRecordsReceived1min() {
+ return _statTotalRecordsReceived1min.get();
+ }
+
+ @Override
+ public double getTotalSysExceptions1min() {
+ return _statTotalSysExceptions1min.get();
+ }
+
+ @Override
+ public double getTotalUserExceptions1min() {
+ return 0;
+ }
+
+ @Override
+ public double getAvgProcessLatency1min() {
+ return 0;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestUserExceptions() {
+ return EvictingQueue.create(0);
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSystemExceptions() {
+ return EMPTY_QUEUE;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSourceExceptions() {
+ return EMPTY_QUEUE;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSinkExceptions() {
+ return EvictingQueue.create(0);
+ }
+}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 77427f74e1..278cd5a882 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -39,6 +39,7 @@
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import
org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+import org.apache.pulsar.functions.utils.Utils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
@@ -75,8 +76,8 @@ public void setup() {
logger,
client,
new ArrayList<>(),
- new EnvironmentBasedSecretsProvider(), new CollectorRegistry(),
new String[0]
- );
+ new EnvironmentBasedSecretsProvider(), new CollectorRegistry(),
new String[0],
+ Utils.ComponentType.FUNCTION);
}
@Test(expected = IllegalStateException.class)
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 1c5d64d6c7..b50bcd5620 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -364,4 +364,21 @@ public static String getFullyQualifiedInstanceId(String
tenant, String namespace
String functionName, int
instanceId) {
return String.format("%s/%s/%s:%d", tenant, namespace, functionName,
instanceId);
}
+
+ public enum ComponentType {
+ FUNCTION("Function"),
+ SOURCE("Source"),
+ SINK("Sink");
+
+ private final String componentName;
+
+ ComponentType(String componentName) {
+ this.componentName = componentName;
+ }
+
+ @Override
+ public String toString() {
+ return componentName;
+ }
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 0e4b1b17c4..50afdc6d26 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -72,9 +72,9 @@
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.join;
import static org.apache.pulsar.functions.utils.Utils.*;
-import static
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.FUNCTION;
-import static
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SINK;
-import static
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SOURCE;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -122,23 +122,6 @@
@Slf4j
public abstract class ComponentImpl {
- public enum ComponentType {
- FUNCTION("Function"),
- SOURCE("Source"),
- SINK("Sink");
-
- private final String componentName;
-
- ComponentType(String componentName) {
- this.componentName = componentName;
- }
-
- @Override
- public String toString() {
- return componentName;
- }
- }
-
private final AtomicReference<StorageClient> storageClient = new
AtomicReference<>();
protected final Supplier<WorkerService> workerServiceSupplier;
protected final ComponentType componentType;
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index f3e41a1d54..be8fd49e06 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -24,6 +24,7 @@
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
@@ -199,7 +200,7 @@ public FunctionStatus emptyStatus(final int parallelism) {
}
public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, ComponentType.FUNCTION);
+ super(workerServiceSupplier, Utils.ComponentType.FUNCTION);
}
/**
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 846900f183..4a801b2233 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -26,6 +26,7 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
@@ -206,7 +207,7 @@ public SinkStatus emptyStatus(final int parallelism) {
}
public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, ComponentType.SINK);
+ super(workerServiceSupplier, Utils.ComponentType.SINK);
}
public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
getSinkInstanceStatus(final String tenant,
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index ac096bb3ed..ec7ba305a7 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -26,6 +26,7 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
@@ -204,7 +205,7 @@ public SourceStatus emptyStatus(final int parallelism) {
}
public SourceImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, ComponentType.SOURCE);
+ super(workerServiceSupplier, Utils.ComponentType.SOURCE);
}
public SourceStatus getSourceStatus(final String tenant,
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index aa22650417..d8d54c95ee 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -76,6 +76,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
import static org.apache.pulsar.functions.utils.Utils.mergeJson;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
@@ -179,7 +180,7 @@ public void setup() throws Exception {
FunctionsImpl functions = spy(new FunctionsImpl(() ->
mockedWorkerService));
-
doReturn(ComponentImpl.ComponentType.FUNCTION).when(functions).calculateSubjectType(any());
+ doReturn(FUNCTION).when(functions).calculateSubjectType(any());
this.resource = spy(new FunctionsImplV2(functions));
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 580345e3c1..7695993114 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -76,6 +76,9 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@@ -177,7 +180,7 @@ public void setup() throws Exception {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
+ doReturn(FUNCTION).when(this.resource).calculateSubjectType(any());
}
//
@@ -1383,9 +1386,9 @@ public void testOnlyGetSources() {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant),
eq(namespace))).thenReturn(functionMetaDataList);
-
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+ doReturn(SOURCE).when(this.resource).calculateSubjectType(f1);
+ doReturn(FUNCTION).when(this.resource).calculateSubjectType(f2);
+ doReturn(SINK).when(this.resource).calculateSubjectType(f3);
List<String> functionList = listDefaultFunctions();
assertEquals(functions, functionList);
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index bec3ca9d40..9e55800216 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -47,7 +47,6 @@
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.RestException;
-import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
import org.apache.pulsar.io.cassandra.CassandraStringSink;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -175,7 +174,7 @@ public void setup() throws Exception {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new SinkImpl(() -> mockedWorkerService));
-
Mockito.doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
+
Mockito.doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
}
//
@@ -1308,9 +1307,9 @@ public void testOnlyGetSinks() {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant),
eq(namespace))).thenReturn(functionMetaDataList);
-
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
List<String> sinkList = listDefaultSinks();
assertEquals(functions, sinkList);
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 72d23e2658..6ba89140ee 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -50,7 +50,6 @@
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.RestException;
-import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
import org.apache.pulsar.io.twitter.TwitterFireHose;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -170,7 +169,7 @@ public void setup() throws Exception {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new SourceImpl(() -> mockedWorkerService));
-
Mockito.doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
+
Mockito.doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
}
//
@@ -1322,9 +1321,9 @@ public void testOnlyGetSources() {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant),
eq(namespace))).thenReturn(functionMetaDataList);
-
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
List<String> sourceList = listDefaultSources();
assertEquals(functions, sourceList);
diff --git a/pulsar-io/data-genenator/pom.xml b/pulsar-io/data-generator/pom.xml
similarity index 100%
rename from pulsar-io/data-genenator/pom.xml
rename to pulsar-io/data-generator/pom.xml
diff --git
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
similarity index 100%
rename from
pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
rename to
pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
diff --git
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
similarity index 100%
rename from
pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
rename to
pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
diff --git
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
similarity index 100%
rename from
pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
rename to
pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
diff --git
a/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml
similarity index 100%
rename from
pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml
rename to
pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 628686b558..ea433ceb28 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -42,7 +42,7 @@
<module>kinesis</module>
<module>hdfs3</module>
<module>jdbc</module>
- <module>data-genenator</module>
+ <module>data-generator</module>
<module>elastic-search</module>
<module>kafka-connect-adaptor</module>
<module>debezium</module>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services