This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5cc4dd8 add sink and source prometheus stats (#3261)
5cc4dd8 is described below
commit 5cc4dd8b759f1bfd0ed76f88e065aee65ac6f857
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Mon Dec 31 12:15:55 2018 -0800
add sink and source prometheus stats (#3261)
* add sink and source prometheus stats
* fixing stuff
---
distribution/io/src/assemble/io.xml | 2 +-
pulsar-broker/pom.xml | 10 +-
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 307 +++++++++++++++++++
.../pulsar/functions/instance/ContextImpl.java | 32 +-
.../pulsar/functions/instance/InstanceUtils.java | 26 ++
.../functions/instance/JavaInstanceRunnable.java | 18 +-
.../instance/stats/ComponentStatsManager.java | 170 +++++++++++
.../instance/{ => stats}/FunctionStatsManager.java | 248 ++++-----------
.../functions/instance/stats/SinkStatsManager.java | 340 +++++++++++++++++++++
.../instance/stats/SourceStatsManager.java | 339 ++++++++++++++++++++
.../pulsar/functions/instance/ContextImplTest.java | 5 +-
.../org/apache/pulsar/functions/utils/Utils.java | 17 ++
.../functions/worker/rest/api/ComponentImpl.java | 23 +-
.../functions/worker/rest/api/FunctionsImpl.java | 3 +-
.../pulsar/functions/worker/rest/api/SinkImpl.java | 3 +-
.../functions/worker/rest/api/SourceImpl.java | 3 +-
.../rest/api/v2/FunctionApiV2ResourceTest.java | 3 +-
.../rest/api/v3/FunctionApiV3ResourceTest.java | 11 +-
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 9 +-
.../rest/api/v3/SourceApiV3ResourceTest.java | 9 +-
.../{data-genenator => data-generator}/pom.xml | 0
.../io/datagenerator/DataGeneratorPrintSink.java | 0
.../io/datagenerator/DataGeneratorSource.java | 0
.../org/apache/pulsar/io/datagenerator/Person.java | 0
.../resources/META-INF/services/pulsar-io.yaml | 0
pulsar-io/pom.xml | 2 +-
26 files changed, 1337 insertions(+), 243 deletions(-)
diff --git a/distribution/io/src/assemble/io.xml
b/distribution/io/src/assemble/io.xml
index a0704c3..10ea149 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -83,7 +83,7 @@
</file>
<file>
-
<source>${basedir}/../../pulsar-io/data-genenator/target/pulsar-io-data-generator-${project.version}.nar</source>
+
<source>${basedir}/../../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar</source>
<outputDirectory>connectors</outputDirectory>
<fileMode>644</fileMode>
</file>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 07c9d6f..c451132 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -272,6 +272,13 @@
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-data-generator</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</dependency>
@@ -328,7 +335,8 @@
<echo>copy test examples package</echo>
<mkdir dir="${basedir}/src/test/resources"/>
<copy
file="${basedir}/../pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar"
tofile="${basedir}/src/test/resources/pulsar-functions-api-examples.jar"/>
- </tasks>
+ <copy
file="${basedir}/../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar"
tofile="${basedir}/src/test/resources/pulsar-io-data-generator.nar"/>
+ </tasks>
</configuration>
</execution>
</executions>
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 7841453..fa1177d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -40,6 +40,8 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
@@ -65,6 +67,7 @@ import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URL;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -78,6 +81,7 @@ import java.util.regex.Pattern;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static
org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -191,6 +195,8 @@ public class PulsarFunctionE2ETest {
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
admin.tenants().updateTenant(tenant, propAdmin);
+ System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");
+
Thread.sleep(100);
}
@@ -257,6 +263,28 @@ public class PulsarFunctionE2ETest {
return functionConfig;
}
+ private static SourceConfig createSourceConfig(String tenant, String
namespace, String functionName, String sinkTopic) {
+ SourceConfig sourceConfig = new SourceConfig();
+ sourceConfig.setTenant(tenant);
+ sourceConfig.setNamespace(namespace);
+ sourceConfig.setName(functionName);
+ sourceConfig.setParallelism(1);
+
sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ sourceConfig.setTopicName(sinkTopic);
+ return sourceConfig;
+ }
+
+ private static SinkConfig createSinkConfig(String tenant, String
namespace, String functionName, String sourceTopic, String subName) {
+ SinkConfig sinkConfig = new SinkConfig();
+ sinkConfig.setTenant(tenant);
+ sinkConfig.setNamespace(namespace);
+ sinkConfig.setName(functionName);
+ sinkConfig.setParallelism(1);
+
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ sinkConfig.setInputs(Collections.singleton(sourceTopic));
+ sinkConfig.setSourceSubscriptionName(subName);
+ return sinkConfig;
+ }
/**
* Validates pulsar sink e2e functionality on functions.
*
@@ -325,6 +353,285 @@ public class PulsarFunctionE2ETest {
}
@Test(timeOut = 20000)
+ public void testPulsarSinkStats() throws Exception {
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopic = "persistent://" + replNamespace + "/input";
+ final String functionName = "PulsarSink-test";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
+ final String subscriptionName = "test-sub";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
+
+ // create a producer that creates a topic at broker
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+
+ String jarFilePathUrl = Utils.FILE + ":" +
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+ SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion,
functionName, sourceTopic, subscriptionName);
+ admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
+
+ admin.sink().updateSinkWithUrl(sinkConfig, jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ return
admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ // validate pulsar sink consumer has started on the topic
+
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+ // validate prometheus metrics empty
+ String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+ log.info("prometheus metrics: {}", prometheusMetrics);
+
+ Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
+ Metric m = metrics.get("pulsar_sink_received_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_received_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_written_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_written_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_sink_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_system_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_system_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_last_invocation");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+
+ int totalMsgs = 10;
+ for (int i = 0; i < totalMsgs; i++) {
+ String data = "my-message-" + i;
+ producer.newMessage().property(propertyKey,
propertyValue).value(data).send();
+ }
+ retryStrategically((test) -> {
+ try {
+ SubscriptionStats subStats =
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+ return subStats.unackedMessages == 0 &&
subStats.msgThroughputOut == totalMsgs;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 200);
+
+ // get stats after producing
+ prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+ log.info("prometheusMetrics: {}", prometheusMetrics);
+
+ metrics = parseMetrics(prometheusMetrics);
+ m = metrics.get("pulsar_sink_received_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, (double) totalMsgs);
+ m = metrics.get("pulsar_sink_received_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, (double) totalMsgs);
+ m = metrics.get("pulsar_sink_written_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, (double) totalMsgs);
+ m = metrics.get("pulsar_sink_written_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, (double) totalMsgs);
+ m = metrics.get("pulsar_sink_sink_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_system_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_system_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_sink_last_invocation");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertTrue(m.value > 0.0);
+ }
+
+ @Test(timeOut = 20000)
+ public void testPulsarSourceStats() throws Exception {
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String functionName = "PulsarSource-test";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
+
+ String jarFilePathUrl = Utils.FILE + ":" +
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+ SourceConfig sourceConfig = createSourceConfig(tenant,
namespacePortion, functionName, sinkTopic);
+ admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
+
+ admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ return (admin.topics().getStats(sinkTopic).publishers.size()
== 1) && (admin.topics().getInternalStats(sinkTopic).numberOfEntries > 4);
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 10, 150);
+ assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);
+
+ String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+ log.info("prometheusMetrics: {}", prometheusMetrics);
+
+ Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
+ Metric m = metrics.get("pulsar_source_received_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertTrue(m.value > 0.0);
+ m = metrics.get("pulsar_source_received_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertTrue(m.value > 0.0);
+ m = metrics.get("pulsar_source_written_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertTrue(m.value > 0.0);
+ m = metrics.get("pulsar_source_written_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertTrue(m.value > 0.0);
+ m = metrics.get("pulsar_source_source_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_source_source_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_source_system_exceptions_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_source_system_exceptions_total_1min");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertEquals(m.value, 0.0);
+ m = metrics.get("pulsar_source_last_invocation");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("function"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
+ assertEquals(m.tags.get("fqfn"),
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion,
functionName));
+ assertTrue(m.value > 0.0);
+ }
+
+ @Test(timeOut = 20000)
public void testPulsarFunctionStats() throws Exception {
final String namespacePortion = "io";
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 69fd6c8..460ba75 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -34,9 +34,14 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
+import org.apache.pulsar.functions.instance.stats.SinkStatsManager;
+import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
@@ -52,7 +57,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState;
-import static
org.apache.pulsar.functions.instance.FunctionStatsManager.USER_METRIC_PREFIX;
+import static
org.apache.pulsar.functions.instance.stats.FunctionStatsManager.USER_METRIC_PREFIX;
/**
* This class implements the Context interface exposed to the user.
@@ -87,12 +92,13 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
private final static String[] userMetricsLabelNames;
static {
// add label to indicate user metric
- userMetricsLabelNames =
Arrays.copyOf(FunctionStatsManager.metricsLabelNames,
FunctionStatsManager.metricsLabelNames.length + 1);
- userMetricsLabelNames[FunctionStatsManager.metricsLabelNames.length] =
"metric";
+ userMetricsLabelNames =
Arrays.copyOf(ComponentStatsManager.metricsLabelNames,
ComponentStatsManager.metricsLabelNames.length + 1);
+ userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length]
= "metric";
}
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient
client, List<String> inputTopics,
- SecretsProvider secretsProvider, CollectorRegistry
collectorRegistry, String[] metricsLabels) {
+ SecretsProvider secretsProvider, CollectorRegistry
collectorRegistry, String[] metricsLabels,
+ Utils.ComponentType componentType) {
this.config = config;
this.logger = logger;
this.publishProducers = new HashMap<>();
@@ -119,9 +125,23 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
}
this.metricsLabels = metricsLabels;
+ String prefix;
+ switch (componentType) {
+ case FUNCTION:
+ prefix = FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX;
+ break;
+ case SINK:
+ prefix = SinkStatsManager.PULSAR_SINK_METRICS_PREFIX;
+ break;
+ case SOURCE:
+ prefix = SourceStatsManager.PULSAR_SOURCE_METRICS_PREFIX;
+ break;
+ default:
+ throw new RuntimeException("Unknown component type: " +
componentType);
+ }
this.userMetricsSummary = Summary.build()
- .name("pulsar_function_user_metric")
- .help("Pulsar Function user defined metric.")
+ .name(prefix + ComponentStatsManager.USER_METRIC_PREFIX)
+ .help("User defined metric.")
.labelNames(userMetricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 86a9aa2..9e32736 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -19,15 +19,22 @@
package org.apache.pulsar.functions.instance;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
import lombok.experimental.UtilityClass;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.Reflections;
import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.functions.utils.Utils;
@UtilityClass
public class InstanceUtils {
@@ -77,4 +84,23 @@ public class InstanceUtils {
return Reflections.createInstance(className, baseClass, clsLoader);
}
}
+
+ public Utils.ComponentType calculateSubjectType(Function.FunctionDetails
functionDetails) {
+ Function.SourceSpec sourceSpec = functionDetails.getSource();
+ Function.SinkSpec sinkSpec = functionDetails.getSink();
+ if (sourceSpec.getInputSpecsCount() == 0) {
+ return SOURCE;
+ }
+ // Now its between sink and function
+
+ if (!isEmpty(sinkSpec.getBuiltin())) {
+ // if its built in, its a sink
+ return SINK;
+ }
+
+ if (isEmpty(sinkSpec.getClassName()) ||
sinkSpec.getClassName().equals(PulsarSink.class.getName())) {
+ return FUNCTION;
+ }
+ return SINK;
+ }
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e858e90..7c36b58 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -55,6 +55,8 @@ import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -116,7 +118,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
// function stats
@Getter
- private FunctionStatsManager stats;
+ private ComponentStatsManager stats;
private Record<?> currentRecord;
@@ -202,7 +204,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
}
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
- return new ContextImpl(instanceConfig, instanceLog, client,
inputTopics, secretsProvider, collectorRegistry, metricsLabels);
+ return new ContextImpl(instanceConfig, instanceLog, client,
inputTopics, secretsProvider,
+ collectorRegistry, metricsLabels,
InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails()));
}
/**
@@ -216,7 +219,9 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
if (this.collectorRegistry == null) {
this.collectorRegistry = new CollectorRegistry();
}
- this.stats = new FunctionStatsManager(this.collectorRegistry,
this.metricsLabels, this.instanceCache.getScheduledExecutorService());
+ this.stats =
ComponentStatsManager.getStatsManager(this.collectorRegistry,
this.metricsLabels,
+ this.instanceCache.getScheduledExecutorService(),
+
InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails()));
ContextImpl contextImpl = setupContext();
javaInstance = setupJavaInstance(contextImpl);
@@ -284,6 +289,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
private void loadJars() throws Exception {
try {
+ log.info("jarFile: {}", jarFile);
// Let's first try to treat it as a nar archive
fnCache.registerFunctionInstanceWithArchive(
instanceConfig.getFunctionId(),
@@ -546,6 +552,12 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
stats.getLatestSystemExceptions().forEach(ex -> {
functionStatusBuilder.addLatestSystemExceptions(ex);
});
+ stats.getLatestSourceExceptions().forEach(ex -> {
+ functionStatusBuilder.addLatestSourceExceptions(ex);
+ });
+ stats.getLatestSinkExceptions().forEach(ex -> {
+ functionStatusBuilder.addLatestSinkExceptions(ex);
+ });
functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency());
functionStatusBuilder.setLastInvocationTime((long)
stats.getLastInvocation());
return functionStatusBuilder;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
new file mode 100644
index 0000000..3cb654b
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.stats;
+
+import com.google.common.collect.EvictingQueue;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.Utils;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public abstract class ComponentStatsManager implements AutoCloseable {
+
+ protected String[] metricsLabels;
+
+ protected ScheduledFuture<?> scheduledFuture;
+
+ protected final CollectorRegistry collectorRegistry;
+
+ protected final EvictingQueue EMPTY_QUEUE = EvictingQueue.create(0);
+
+ public final static String USER_METRIC_PREFIX = "user_metric_";
+
+ public static final String[] metricsLabelNames = {"tenant", "namespace",
"function", "instance_id", "cluster", "fqfn"};
+
+ protected static final String[] exceptionMetricsLabelNames;
+
+ static {
+ exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames,
metricsLabelNames.length + 2);
+ exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
+ exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts";
+ }
+
+ public static ComponentStatsManager getStatsManager(CollectorRegistry
collectorRegistry,
+ String[] metricsLabels,
+ ScheduledExecutorService
scheduledExecutorService,
+ Utils.ComponentType componentType) {
+ switch (componentType) {
+ case FUNCTION:
+ return new FunctionStatsManager(collectorRegistry,
metricsLabels, scheduledExecutorService);
+ case SOURCE:
+ return new SourceStatsManager(collectorRegistry,
metricsLabels, scheduledExecutorService);
+ case SINK:
+ return new SinkStatsManager(collectorRegistry, metricsLabels,
scheduledExecutorService);
+ default:
+ throw new RuntimeException("Unknown component type: " +
componentType);
+ }
+ }
+
+ public ComponentStatsManager(CollectorRegistry collectorRegistry,
+ String[] metricsLabels,
+ ScheduledExecutorService scheduledExecutorService) {
+
+ this.collectorRegistry = collectorRegistry;
+ this.metricsLabels = metricsLabels;
+
+ scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new
Runnable() {
+ @Override
+ public void run() {
+ try {
+ reset();
+ } catch (Exception e) {
+ log.error("Failed to reset metrics for 1min window", e);
+ }
+ }
+ }, 1, 1, TimeUnit.MINUTES);
+ }
+
+ public abstract void reset();
+
+ public abstract void incrTotalReceived();
+
+ public abstract void incrTotalProcessedSuccessfully();
+
+ public abstract void incrSysExceptions(Throwable sysException);
+
+ public abstract void incrUserExceptions(Exception userException);
+
+ public abstract void incrSourceExceptions(Exception userException);
+
+ public abstract void incrSinkExceptions(Exception userException);
+
+ public abstract void setLastInvocation(long ts);
+
+ public abstract void processTimeStart();
+
+ public abstract void processTimeEnd();
+
+ public abstract double getTotalProcessedSuccessfully();
+
+ public abstract double getTotalRecordsReceived();
+
+ public abstract double getTotalSysExceptions();
+
+ public abstract double getTotalUserExceptions();
+
+ public abstract double getLastInvocation();
+
+ public abstract double getAvgProcessLatency();
+
+ public abstract double getTotalProcessedSuccessfully1min();
+
+ public abstract double getTotalRecordsReceived1min();
+
+ public abstract double getTotalSysExceptions1min();
+
+ public abstract double getTotalUserExceptions1min();
+
+ public abstract double getAvgProcessLatency1min();
+
+ public abstract
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestUserExceptions();
+
+ public abstract
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSystemExceptions();
+
+ public abstract
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSourceExceptions();
+
+ public abstract
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSinkExceptions();
+
+ public String getStatsAsString() throws IOException {
+ StringWriter outputWriter = new StringWriter();
+
+ TextFormat.write004(outputWriter,
collectorRegistry.metricFamilySamples());
+
+ return outputWriter.toString();
+ }
+
+ protected InstanceCommunication.FunctionStatus.ExceptionInformation
getExceptionInfo(Throwable th, long ts) {
+ InstanceCommunication.FunctionStatus.ExceptionInformation.Builder
exceptionInfoBuilder =
+
InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder().setMsSinceEpoch(ts);
+ String msg = th.getMessage();
+ if (msg != null) {
+ exceptionInfoBuilder.setExceptionString(msg);
+ }
+ return exceptionInfoBuilder.build();
+ }
+
+
+ @Override
+ public void close() {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ scheduledFuture = null;
+ }
+ }
+}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
similarity index 64%
rename from
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
rename to
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index 818fdb3..d03bfe2 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -16,25 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.instance;
+package org.apache.pulsar.functions.instance.stats;
import com.google.common.collect.EvictingQueue;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
-import io.prometheus.client.exporter.common.TextFormat;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.RateLimiter;
import org.apache.pulsar.functions.proto.InstanceCommunication;
-import java.io.IOException;
-import java.io.StringWriter;
import java.util.Arrays;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@@ -43,18 +39,9 @@ import java.util.concurrent.TimeUnit;
@Slf4j
@Getter
@Setter
-public class FunctionStatsManager implements AutoCloseable {
-
- static final String[] metricsLabelNames = {"tenant", "namespace",
"function", "instance_id", "cluster", "fqfn"};
- static final String[] exceptionMetricsLabelNames;
- static {
- exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames,
metricsLabelNames.length + 2);
- exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
- exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts";
- }
+public class FunctionStatsManager extends ComponentStatsManager{
public static final String PULSAR_FUNCTION_METRICS_PREFIX =
"pulsar_function_";
- public final static String USER_METRIC_PREFIX = "user_metric_";
/** Declare metric names **/
public static final String PROCESSED_SUCCESSFULLY_TOTAL =
"processed_successfully_total";
@@ -81,16 +68,12 @@ public class FunctionStatsManager implements AutoCloseable {
final Counter statTotalSysExceptions;
final Counter statTotalUserExceptions;
-
- final Counter statTotalSourceExceptions;
-
- final Counter statTotalSinkExceptions;
-
+
final Summary statProcessLatency;
final Gauge statlastInvocation;
- final Counter statTotalRecordsRecieved;
+ final Counter statTotalRecordsReceived;
// windowed metrics
@@ -99,14 +82,10 @@ public class FunctionStatsManager implements AutoCloseable {
final Counter statTotalSysExceptions1min;
final Counter statTotalUserExceptions1min;
-
- final Counter statTotalSourceExceptions1min;
-
- final Counter statTotalSinkExceptions1min;
-
+
final Summary statProcessLatency1min;
- final Counter statTotalRecordsRecieved1min;
+ final Counter statTotalRecordsReceived1min;
// exceptions
@@ -122,47 +101,28 @@ public class FunctionStatsManager implements
AutoCloseable {
private final Counter.Child _statTotalProcessedSuccessfully;
private final Counter.Child _statTotalSysExceptions;
private final Counter.Child _statTotalUserExceptions;
- private final Counter.Child _statTotalSourceExceptions;
- private final Counter.Child _statTotalSinkExceptions;
private final Summary.Child _statProcessLatency;
private final Gauge.Child _statlastInvocation;
- private final Counter.Child _statTotalRecordsRecieved;
+ private final Counter.Child _statTotalRecordsReceived;
private Counter.Child _statTotalProcessedSuccessfully1min;
private Counter.Child _statTotalSysExceptions1min;
private Counter.Child _statTotalUserExceptions1min;
- private Counter.Child _statTotalSourceExceptions1min;
- private Counter.Child _statTotalSinkExceptions1min;
private Summary.Child _statProcessLatency1min;
- private Counter.Child _statTotalRecordsRecieved1min;
-
- private String[] metricsLabels;
-
- private ScheduledFuture<?> scheduledFuture;
-
- private final CollectorRegistry collectorRegistry;
+ private Counter.Child _statTotalRecordsReceived1min;
@Getter
private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestUserExceptions = EvictingQueue.create(10);
@Getter
private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSystemExceptions = EvictingQueue.create(10);
- @Getter
- private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSourceExceptions = EvictingQueue.create(10);
- @Getter
- private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSinkExceptions = EvictingQueue.create(10);
private final RateLimiter userExceptionRateLimiter;
private final RateLimiter sysExceptionRateLimiter;
- private final RateLimiter sourceExceptionRateLimiter;
-
- private final RateLimiter sinkExceptionRateLimiter;
-
- public FunctionStatsManager(CollectorRegistry collectorRegistry, String[]
metricsLabels, ScheduledExecutorService scheduledExecutorService) {
-
- this.collectorRegistry = collectorRegistry;
-
- this.metricsLabels = metricsLabels;
+ public FunctionStatsManager(CollectorRegistry collectorRegistry,
+ String[] metricsLabels,
+ ScheduledExecutorService
scheduledExecutorService) {
+ super(collectorRegistry, metricsLabels, scheduledExecutorService);
statTotalProcessedSuccessfully = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX +
PROCESSED_SUCCESSFULLY_TOTAL)
@@ -185,20 +145,6 @@ public class FunctionStatsManager implements AutoCloseable
{
.register(collectorRegistry);
_statTotalUserExceptions =
statTotalUserExceptions.labels(metricsLabels);
- statTotalSourceExceptions = Counter.build()
- .name(PULSAR_FUNCTION_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL)
- .help("Total number of source exceptions.")
- .labelNames(metricsLabelNames)
- .register(collectorRegistry);
- _statTotalSourceExceptions =
statTotalSourceExceptions.labels(metricsLabels);
-
- statTotalSinkExceptions = Counter.build()
- .name(PULSAR_FUNCTION_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL)
- .help("Total number of sink exceptions.")
- .labelNames(metricsLabelNames)
- .register(collectorRegistry);
- _statTotalSinkExceptions =
statTotalSinkExceptions.labels(metricsLabels);
-
statProcessLatency = Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
.help("Process latency in milliseconds.")
@@ -217,12 +163,12 @@ public class FunctionStatsManager implements
AutoCloseable {
.register(collectorRegistry);
_statlastInvocation = statlastInvocation.labels(metricsLabels);
- statTotalRecordsRecieved = Counter.build()
+ statTotalRecordsReceived = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL)
.help("Total number of messages received from source.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
- _statTotalRecordsRecieved =
statTotalRecordsRecieved.labels(metricsLabels);
+ _statTotalRecordsReceived =
statTotalRecordsReceived.labels(metricsLabels);
statTotalProcessedSuccessfully1min = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX +
PROCESSED_SUCCESSFULLY_TOTAL_1min)
@@ -245,20 +191,6 @@ public class FunctionStatsManager implements AutoCloseable
{
.register(collectorRegistry);
_statTotalUserExceptions1min =
statTotalUserExceptions1min.labels(metricsLabels);
- statTotalSourceExceptions1min = Counter.build()
- .name(PULSAR_FUNCTION_METRICS_PREFIX +
SOURCE_EXCEPTIONS_TOTAL_1min)
- .help("Total number of source exceptions in the last 1
minute.")
- .labelNames(metricsLabelNames)
- .register(collectorRegistry);
- _statTotalSourceExceptions1min =
statTotalSourceExceptions1min.labels(metricsLabels);
-
- statTotalSinkExceptions1min = Counter.build()
- .name(PULSAR_FUNCTION_METRICS_PREFIX +
SINK_EXCEPTIONS_TOTAL_1min)
- .help("Total number of sink exceptions in the last 1 minute.")
- .labelNames(metricsLabelNames)
- .register(collectorRegistry);
- _statTotalSinkExceptions1min =
statTotalSinkExceptions1min.labels(metricsLabels);
-
statProcessLatency1min = Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
.help("Process latency in milliseconds in the last 1 minute.")
@@ -270,12 +202,12 @@ public class FunctionStatsManager implements
AutoCloseable {
.register(collectorRegistry);
_statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);
- statTotalRecordsRecieved1min = Counter.build()
+ statTotalRecordsReceived1min = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min)
.help("Total number of messages received from source in the
last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
- _statTotalRecordsRecieved1min =
statTotalRecordsRecieved1min.labels(metricsLabels);
+ _statTotalRecordsReceived1min =
statTotalRecordsReceived1min.labels(metricsLabels);
userExceptions = Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception")
@@ -300,22 +232,8 @@ public class FunctionStatsManager implements AutoCloseable
{
.help("Exception from sink.")
.register(collectorRegistry);
- scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new
Runnable() {
- @Override
- public void run() {
- try {
- reset();
- } catch (Exception e) {
- log.error("Failed to reset metrics for 1min window", e);
- }
- }
- }, 1, 1, TimeUnit.MINUTES);
-
userExceptionRateLimiter = new RateLimiter(scheduledExecutorService,
5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5,
1, TimeUnit.MINUTES);
- sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService,
5, 1, TimeUnit.MINUTES);
- sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService,
5, 1, TimeUnit.MINUTES);
-
}
public void addUserException(Exception ex) {
@@ -346,87 +264,54 @@ public class FunctionStatsManager implements
AutoCloseable {
}
}
- public void addSourceException(Throwable ex) {
- long ts = System.currentTimeMillis();
- InstanceCommunication.FunctionStatus.ExceptionInformation info =
getExceptionInfo(ex, ts);
- latestSourceExceptions.add(info);
-
- // report exception throw prometheus
- if (sourceExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 2);
- exceptionMetricsLabels[exceptionMetricsLabels.length - 2] =
ex.getMessage() != null ? ex.getMessage() : "";
- exceptionMetricsLabels[exceptionMetricsLabels.length - 1] =
String.valueOf(ts);
- sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
- }
- }
-
- public void addSinkException(Throwable ex) {
- long ts = System.currentTimeMillis();
- InstanceCommunication.FunctionStatus.ExceptionInformation info =
getExceptionInfo(ex, ts);
- latestSinkExceptions.add(info);
-
- // report exception throw prometheus
- if (sinkExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 2);
- exceptionMetricsLabels[exceptionMetricsLabels.length - 2] =
ex.getMessage() != null ? ex.getMessage() : "";
- exceptionMetricsLabels[exceptionMetricsLabels.length - 1] =
String.valueOf(ts);
- sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
- }
- }
-
- private InstanceCommunication.FunctionStatus.ExceptionInformation
getExceptionInfo(Throwable th, long ts) {
- InstanceCommunication.FunctionStatus.ExceptionInformation.Builder
exceptionInfoBuilder =
-
InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder().setMsSinceEpoch(ts);
- String msg = th.getMessage();
- if (msg != null) {
- exceptionInfoBuilder.setExceptionString(msg);
- }
- return exceptionInfoBuilder.build();
- }
-
+ @Override
public void incrTotalReceived() {
- _statTotalRecordsRecieved.inc();
- _statTotalRecordsRecieved1min.inc();
+ _statTotalRecordsReceived.inc();
+ _statTotalRecordsReceived1min.inc();
}
+ @Override
public void incrTotalProcessedSuccessfully() {
_statTotalProcessedSuccessfully.inc();
_statTotalProcessedSuccessfully1min.inc();
}
+ @Override
public void incrSysExceptions(Throwable sysException) {
_statTotalSysExceptions.inc();
_statTotalSysExceptions1min.inc();
addSystemException(sysException);
}
+ @Override
public void incrUserExceptions(Exception userException) {
_statTotalUserExceptions.inc();
_statTotalUserExceptions1min.inc();
addUserException(userException);
}
- public void incrSourceExceptions(Exception userException) {
- _statTotalSourceExceptions.inc();
- _statTotalSourceExceptions1min.inc();
- addSourceException(userException);
+ @Override
+ public void incrSourceExceptions(Exception ex) {
+ incrSysExceptions(ex);
}
- public void incrSinkExceptions(Exception userException) {
- _statTotalSinkExceptions.inc();
- _statTotalSinkExceptions1min.inc();
- addSinkException(userException);
+ @Override
+ public void incrSinkExceptions(Exception ex) {
+ incrSysExceptions(ex);
}
+ @Override
public void setLastInvocation(long ts) {
_statlastInvocation.set(ts);
}
private Long processTimeStart;
+ @Override
public void processTimeStart() {
processTimeStart = System.nanoTime();
}
+ @Override
public void processTimeEnd() {
if (processTimeStart != null) {
double endTimeMs = ((double) System.nanoTime() - processTimeStart)
/ 1.0E6D;
@@ -435,30 +320,27 @@ public class FunctionStatsManager implements
AutoCloseable {
}
}
+ @Override
public double getTotalProcessedSuccessfully() {
return _statTotalProcessedSuccessfully.get();
}
+ @Override
public double getTotalRecordsReceived() {
- return _statTotalRecordsRecieved.get();
+ return _statTotalRecordsReceived.get();
}
+ @Override
public double getTotalSysExceptions() {
return _statTotalSysExceptions.get();
}
+ @Override
public double getTotalUserExceptions() {
return _statTotalUserExceptions.get();
}
-
- public double getTotalSourceExceptions() {
- return _statTotalSourceExceptions.get();
- }
-
- public double getTotalSinkExceptions() {
- return _statTotalSinkExceptions.get();
- }
-
+
+ @Override
public double getLastInvocation() {
return _statlastInvocation.get();
}
@@ -484,33 +366,40 @@ public class FunctionStatsManager implements
AutoCloseable {
return _statProcessLatency.get().quantiles.get(0.999);
}
+ @Override
public double getTotalProcessedSuccessfully1min() {
return _statTotalProcessedSuccessfully1min.get();
}
+ @Override
public double getTotalRecordsReceived1min() {
- return _statTotalRecordsRecieved1min.get();
+ return _statTotalRecordsReceived1min.get();
}
+ @Override
public double getTotalSysExceptions1min() {
return _statTotalSysExceptions1min.get();
}
+ @Override
public double getTotalUserExceptions1min() {
return _statTotalUserExceptions1min.get();
}
-
- public double getTotalSourceExceptions1min() {
- return _statTotalSourceExceptions1min.get();
+
+ @Override
+ public double getAvgProcessLatency1min() {
+ return _statProcessLatency1min.get().count <= 0.0
+ ? 0 : _statProcessLatency1min.get().sum /
_statProcessLatency1min.get().count;
}
- public double getTotalSinkExceptions1min() {
- return _statTotalSinkExceptions1min.get();
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSourceExceptions() {
+ return EMPTY_QUEUE;
}
- public double getAvgProcessLatency1min() {
- return _statProcessLatency1min.get().count <= 0.0
- ? 0 : _statProcessLatency1min.get().sum /
_statProcessLatency1min.get().count;
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSinkExceptions() {
+ return EMPTY_QUEUE;
}
public double getProcessLatency50P1min() {
@@ -529,6 +418,7 @@ public class FunctionStatsManager implements AutoCloseable {
return _statProcessLatency1min.get().quantiles.get(0.999);
}
+ @Override
public void reset() {
statTotalProcessedSuccessfully1min.clear();
_statTotalProcessedSuccessfully1min =
statTotalProcessedSuccessfully1min.labels(metricsLabels);
@@ -539,37 +429,13 @@ public class FunctionStatsManager implements
AutoCloseable {
statTotalUserExceptions1min.clear();
_statTotalUserExceptions1min =
statTotalUserExceptions1min.labels(metricsLabels);
- statTotalSourceExceptions1min.clear();
- _statTotalSourceExceptions1min =
statTotalSourceExceptions1min.labels(metricsLabels);
-
- statTotalSinkExceptions1min.clear();
- _statTotalSinkExceptions1min =
statTotalSinkExceptions1min.labels(metricsLabels);
-
statProcessLatency1min.clear();
_statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);
- statTotalRecordsRecieved1min.clear();
- _statTotalRecordsRecieved1min =
statTotalRecordsRecieved1min.labels(metricsLabels);
+ statTotalRecordsReceived1min.clear();
+ _statTotalRecordsReceived1min =
statTotalRecordsReceived1min.labels(metricsLabels);
latestUserExceptions.clear();
latestSystemExceptions.clear();
- latestSourceExceptions.clear();
- latestSinkExceptions.clear();
- }
-
- public String getStatsAsString() throws IOException {
- StringWriter outputWriter = new StringWriter();
-
- TextFormat.write004(outputWriter,
collectorRegistry.metricFamilySamples());
-
- return outputWriter.toString();
- }
-
- @Override
- public void close() {
- if (scheduledFuture != null) {
- scheduledFuture.cancel(false);
- scheduledFuture = null;
- }
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
new file mode 100644
index 0000000..14e4c21
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.stats;
+
+import com.google.common.collect.EvictingQueue;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import lombok.Getter;
+import org.apache.pulsar.common.util.RateLimiter;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+
+import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class SinkStatsManager extends ComponentStatsManager {
+
+ public static final String PULSAR_SINK_METRICS_PREFIX = "pulsar_sink_";
+
+ /** Declare metric names **/
+ public static final String SYSTEM_EXCEPTIONS_TOTAL =
"system_exceptions_total";
+ public static final String SINK_EXCEPTIONS_TOTAL = "sink_exceptions_total";
+ public static final String LAST_INVOCATION = "last_invocation";
+ public static final String RECEIVED_TOTAL = "received_total";
+ public static final String WRITTEN_TOTAL = "written_total";
+
+ public static final String SYSTEM_EXCEPTIONS_TOTAL_1min =
"system_exceptions_total_1min";
+ public static final String SINK_EXCEPTIONS_TOTAL_1min =
"sink_exceptions_total_1min";
+ public static final String RECEIVED_TOTAL_1min = "received_total_1min";
+ public static final String WRITTEN_TOTAL_1min = "written_total_1min";
+
+ /** Declare Prometheus stats **/
+
+ private final Counter statTotalRecordsReceived;
+
+ private final Counter statTotalSysExceptions;
+
+ private final Counter statTotalSinkExceptions;
+
+ private final Counter statTotalWritten;
+
+ private final Gauge statlastInvocation;
+
+ // windowed metrics
+ private final Counter statTotalRecordsReceived1min;
+
+ private final Counter statTotalSysExceptions1min;
+
+ private final Counter statTotalSinkExceptions1min;
+
+ private final Counter statTotalWritten1min;
+
+ // exceptions
+
+ final Gauge sysExceptions;
+
+ final Gauge sinkExceptions;
+
+ // As an optimization
+ private final Counter.Child _statTotalRecordsReceived;
+ private final Counter.Child _statTotalSysExceptions;
+ private final Counter.Child _statTotalSinkExceptions;
+ private final Counter.Child _statTotalWritten;
+ private final Gauge.Child _statlastInvocation;
+
+ private Counter.Child _statTotalRecordsReceived1min;
+ private Counter.Child _statTotalSysExceptions1min;
+ private Counter.Child _statTotalSinkExceptions1min;
+ private Counter.Child _statTotalWritten1min;
+
+ @Getter
+ private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSystemExceptions = EvictingQueue.create(10);
+ @Getter
+ private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSinkExceptions = EvictingQueue.create(10);
+
+ private final RateLimiter sysExceptionRateLimiter;
+
+ private final RateLimiter sinkExceptionRateLimiter;
+
+
+ public SinkStatsManager(CollectorRegistry collectorRegistry, String[]
metricsLabels, ScheduledExecutorService
+ scheduledExecutorService) {
+ super(collectorRegistry, metricsLabels, scheduledExecutorService);
+
+ statTotalRecordsReceived = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL)
+ .help("Total number of records sink has received from Pulsar
topic(s).")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalRecordsReceived =
statTotalRecordsReceived.labels(metricsLabels);
+
+ statTotalSysExceptions = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
+ .help("Total number of system exceptions.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
+
+ statTotalSinkExceptions = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL)
+ .help("Total number of sink exceptions.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSinkExceptions =
statTotalSinkExceptions.labels(metricsLabels);
+
+ statTotalWritten = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL)
+ .help("Total number of records processed by sink.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalWritten = statTotalWritten.labels(metricsLabels);
+
+ statlastInvocation = Gauge.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + LAST_INVOCATION)
+ .help("The timestamp of the last invocation of the sink.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statlastInvocation = statlastInvocation.labels(metricsLabels);
+
+ statTotalRecordsReceived1min = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL_1min)
+ .help("Total number of messages sink has received from Pulsar
topic(s) in the last 1 minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalRecordsReceived1min =
statTotalRecordsReceived1min.labels(metricsLabels);
+
+ statTotalSysExceptions1min = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX +
SYSTEM_EXCEPTIONS_TOTAL_1min)
+ .help("Total number of system exceptions in the last 1
minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSysExceptions1min =
statTotalSysExceptions1min.labels(metricsLabels);
+
+ statTotalSinkExceptions1min = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min)
+ .help("Total number of sink exceptions in the last 1 minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSinkExceptions1min =
statTotalSinkExceptions1min.labels(metricsLabels);
+
+ statTotalWritten1min = Counter.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL_1min)
+ .help("Total number of records processed by sink the last 1
minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
+
+ sysExceptions = Gauge.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + "system_exception")
+ .labelNames(exceptionMetricsLabelNames)
+ .help("Exception from system code.")
+ .register(collectorRegistry);
+
+ sinkExceptions = Gauge.build()
+ .name(PULSAR_SINK_METRICS_PREFIX + "sink_exception")
+ .labelNames(exceptionMetricsLabelNames)
+ .help("Exception from sink.")
+ .register(collectorRegistry);
+
+ sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5,
1, TimeUnit.MINUTES);
+ sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService,
5, 1, TimeUnit.MINUTES);
+ }
+
+ @Override
+ public void reset() {
+ statTotalRecordsReceived1min.clear();
+ _statTotalRecordsReceived1min =
statTotalRecordsReceived1min.labels(metricsLabels);
+
+ statTotalSysExceptions1min.clear();
+ _statTotalSysExceptions1min =
statTotalSysExceptions1min.labels(metricsLabels);
+
+ statTotalSinkExceptions1min.clear();
+ _statTotalSinkExceptions1min =
statTotalSinkExceptions1min.labels(metricsLabels);
+
+ statTotalWritten1min.clear();
+ _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
+
+ latestSystemExceptions.clear();
+ latestSinkExceptions.clear();
+ }
+
+ @Override
+ public void incrTotalReceived() {
+ _statTotalRecordsReceived.inc();
+ _statTotalRecordsReceived1min.inc();
+ }
+
+ @Override
+ public void incrTotalProcessedSuccessfully() {
+ _statTotalWritten.inc();
+ _statTotalWritten1min.inc();
+ }
+
+ @Override
+ public void incrSysExceptions(Throwable ex) {
+ long ts = System.currentTimeMillis();
+ InstanceCommunication.FunctionStatus.ExceptionInformation info =
getExceptionInfo(ex, ts);
+ latestSystemExceptions.add(info);
+
+ // report exception throw prometheus
+ if (sysExceptionRateLimiter.tryAcquire()) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 2);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 2] =
ex.getMessage() != null ? ex.getMessage() : "";
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] =
String.valueOf(ts);
+ sysExceptions.labels(exceptionMetricsLabels).set(1.0);
+ }
+ }
+
+ @Override
+ public void incrUserExceptions(Exception ex) {
+ incrSysExceptions(ex);
+ }
+
+ @Override
+ public void incrSourceExceptions(Exception ex) {
+ incrSysExceptions(ex);
+ }
+
+ @Override
+ public void incrSinkExceptions(Exception ex) {
+ long ts = System.currentTimeMillis();
+ InstanceCommunication.FunctionStatus.ExceptionInformation info =
getExceptionInfo(ex, ts);
+ latestSinkExceptions.add(info);
+
+ // report exception throw prometheus
+ if (sinkExceptionRateLimiter.tryAcquire()) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 2);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 2] =
ex.getMessage() != null ? ex.getMessage() : "";
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] =
String.valueOf(ts);
+ sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
+ }
+ }
+
+ @Override
+ public void setLastInvocation(long ts) {
+ _statlastInvocation.set(ts);
+ }
+
+ @Override
+ public void processTimeStart() {
+ //no-p[
+ }
+
+ @Override
+ public void processTimeEnd() {
+ //no-op
+ }
+
+ @Override
+ public double getTotalProcessedSuccessfully() {
+ return _statTotalWritten.get();
+ }
+
+ @Override
+ public double getTotalRecordsReceived() {
+ return _statTotalRecordsReceived.get();
+ }
+
+ @Override
+ public double getTotalSysExceptions() {
+ return _statTotalSysExceptions.get();
+ }
+
+ @Override
+ public double getTotalUserExceptions() {
+ return 0;
+ }
+
+ @Override
+ public double getLastInvocation() {
+ return _statlastInvocation.get();
+ }
+
+ @Override
+ public double getAvgProcessLatency() {
+ return 0;
+ }
+
+ @Override
+ public double getTotalProcessedSuccessfully1min() {
+ return _statTotalWritten1min.get();
+ }
+
+ @Override
+ public double getTotalRecordsReceived1min() {
+ return _statTotalRecordsReceived1min.get();
+ }
+
+ @Override
+ public double getTotalSysExceptions1min() {
+ return _statTotalSysExceptions1min.get();
+ }
+
+ @Override
+ public double getTotalUserExceptions1min() {
+ return 0;
+ }
+
+ @Override
+ public double getAvgProcessLatency1min() {
+ return 0;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestUserExceptions() {
+ return EMPTY_QUEUE;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSystemExceptions() {
+ return latestSystemExceptions;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSourceExceptions() {
+ return EMPTY_QUEUE;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSinkExceptions() {
+ return latestSinkExceptions;
+ }
+}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
new file mode 100644
index 0000000..5679f2e
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.stats;
+
+import com.google.common.collect.EvictingQueue;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import lombok.Getter;
+import org.apache.pulsar.common.util.RateLimiter;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+
+import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class SourceStatsManager extends ComponentStatsManager {
+
+ public static final String PULSAR_SOURCE_METRICS_PREFIX = "pulsar_source_";
+
+ /** Declare metric names **/
+ public static final String SYSTEM_EXCEPTIONS_TOTAL =
"system_exceptions_total";
+ public static final String SOURCE_EXCEPTIONS_TOTAL =
"source_exceptions_total";
+ public static final String LAST_INVOCATION = "last_invocation";
+ public static final String RECEIVED_TOTAL = "received_total";
+ public static final String WRITTEN_TOTAL = "written_total";
+
+ public static final String SYSTEM_EXCEPTIONS_TOTAL_1min =
"system_exceptions_total_1min";
+ public static final String SOURCE_EXCEPTIONS_TOTAL_1min =
"source_exceptions_total_1min";
+ public static final String RECEIVED_TOTAL_1min = "received_total_1min";
+ public static final String WRITTEN_TOTAL_1min = "written_total_1min";
+
+ /** Declare Prometheus stats **/
+
+ private final Counter statTotalRecordsReceived;
+
+ private final Counter statTotalSysExceptions;
+
+ private final Counter statTotalSourceExceptions;
+
+ private final Counter statTotalWritten;
+
+ private final Gauge statlastInvocation;
+
+ // windowed metrics
+ private final Counter statTotalRecordsReceived1min;
+
+ private final Counter statTotalSysExceptions1min;
+
+ private final Counter statTotalSourceExceptions1min;
+
+ private final Counter statTotalWritten1min;
+
+ // exceptions
+
+ final Gauge sysExceptions;
+
+ final Gauge sourceExceptions;
+
+ // As an optimization
+ private final Counter.Child _statTotalRecordsReceived;
+ private final Counter.Child _statTotalSysExceptions;
+ private final Counter.Child _statTotalSourceExceptions;
+ private final Counter.Child _statTotalWritten;
+ private final Gauge.Child _statlastInvocation;
+
+ private Counter.Child _statTotalRecordsReceived1min;
+ private Counter.Child _statTotalSysExceptions1min;
+ private Counter.Child _statTotalSourceExceptions1min;
+ private Counter.Child _statTotalWritten1min;
+
+ @Getter
+ private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSystemExceptions = EvictingQueue.create(10);
+ @Getter
+ private
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
latestSourceExceptions = EvictingQueue.create(10);
+
+ protected final RateLimiter sysExceptionRateLimiter;
+
+ protected final RateLimiter sourceExceptionRateLimiter;
+
+ public SourceStatsManager(CollectorRegistry collectorRegistry, String[]
metricsLabels, ScheduledExecutorService
+ scheduledExecutorService) {
+ super(collectorRegistry, metricsLabels, scheduledExecutorService);
+
+ statTotalRecordsReceived = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL)
+ .help("Total number of records received from source.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalRecordsReceived =
statTotalRecordsReceived.labels(metricsLabels);
+
+ statTotalSysExceptions = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
+ .help("Total number of system exceptions.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
+
+ statTotalSourceExceptions = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL)
+ .help("Total number of source exceptions.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSourceExceptions =
statTotalSourceExceptions.labels(metricsLabels);
+
+ statTotalWritten = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL)
+ .help("Total number of records written to a Pulsar topic.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalWritten = statTotalWritten.labels(metricsLabels);
+
+ statlastInvocation = Gauge.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + LAST_INVOCATION)
+ .help("The timestamp of the last invocation of the source.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statlastInvocation = statlastInvocation.labels(metricsLabels);
+
+ statTotalRecordsReceived1min = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL_1min)
+ .help("Total number of records received from source in the
last 1 minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalRecordsReceived1min =
statTotalRecordsReceived1min.labels(metricsLabels);
+
+ statTotalSysExceptions1min = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX +
SYSTEM_EXCEPTIONS_TOTAL_1min)
+ .help("Total number of system exceptions in the last 1
minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSysExceptions1min =
statTotalSysExceptions1min.labels(metricsLabels);
+
+ statTotalSourceExceptions1min = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX +
SOURCE_EXCEPTIONS_TOTAL_1min)
+ .help("Total number of source exceptions in the last 1
minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalSourceExceptions1min =
statTotalSourceExceptions1min.labels(metricsLabels);
+
+ statTotalWritten1min = Counter.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL_1min)
+ .help("Total number of records written to a Pulsar topic in
the last 1 minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+ _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
+
+ sysExceptions = Gauge.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + "system_exception")
+ .labelNames(exceptionMetricsLabelNames)
+ .help("Exception from system code.")
+ .register(collectorRegistry);
+
+ sourceExceptions = Gauge.build()
+ .name(PULSAR_SOURCE_METRICS_PREFIX + "source_exception")
+ .labelNames(exceptionMetricsLabelNames)
+ .help("Exception from source.")
+ .register(collectorRegistry);
+
+ sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5,
1, TimeUnit.MINUTES);
+ sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService,
5, 1, TimeUnit.MINUTES);
+ }
+
+ @Override
+ public void reset() {
+ statTotalRecordsReceived1min.clear();
+ _statTotalRecordsReceived1min =
statTotalRecordsReceived1min.labels(metricsLabels);
+
+ statTotalSysExceptions1min.clear();
+ _statTotalSysExceptions1min =
statTotalSysExceptions1min.labels(metricsLabels);
+
+ statTotalSourceExceptions1min.clear();
+ _statTotalSourceExceptions1min =
statTotalSourceExceptions1min.labels(metricsLabels);
+
+ statTotalWritten1min.clear();
+ _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
+
+ latestSystemExceptions.clear();
+ latestSourceExceptions.clear();
+ }
+
+ @Override
+ public void incrTotalReceived() {
+ _statTotalRecordsReceived.inc();
+ _statTotalRecordsReceived1min.inc();
+ }
+
+ @Override
+ public void incrTotalProcessedSuccessfully() {
+ _statTotalWritten.inc();
+ _statTotalWritten1min.inc();
+ }
+
+ @Override
+ public void incrSysExceptions(Throwable ex) {
+ long ts = System.currentTimeMillis();
+ InstanceCommunication.FunctionStatus.ExceptionInformation info =
getExceptionInfo(ex, ts);
+ latestSystemExceptions.add(info);
+
+ // report exception throw prometheus
+ if (sysExceptionRateLimiter.tryAcquire()) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 2);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 2] =
ex.getMessage() != null ? ex.getMessage() : "";
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] =
String.valueOf(ts);
+ sysExceptions.labels(exceptionMetricsLabels).set(1.0);
+ }
+ }
+
+ @Override
+ public void incrUserExceptions(Exception ex) {
+ incrSysExceptions(ex);
+ }
+
+ @Override
+ public void incrSourceExceptions(Exception ex) {
+ long ts = System.currentTimeMillis();
+ InstanceCommunication.FunctionStatus.ExceptionInformation info =
getExceptionInfo(ex, ts);
+ latestSourceExceptions.add(info);
+
+ // report exception throw prometheus
+ if (sourceExceptionRateLimiter.tryAcquire()) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels,
metricsLabels.length + 2);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 2] =
ex.getMessage() != null ? ex.getMessage() : "";
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] =
String.valueOf(ts);
+ sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
+ }
+ }
+
+ @Override
+ public void incrSinkExceptions(Exception ex) {
+ incrSysExceptions(ex);
+ }
+
+ @Override
+ public void setLastInvocation(long ts) {
+ _statlastInvocation.set(ts);
+ }
+
+ @Override
+ public void processTimeStart() {
+ //no-op
+ }
+
+ @Override
+ public void processTimeEnd() {
+ //no-op
+ }
+
+ @Override
+ public double getTotalProcessedSuccessfully() {
+ return _statTotalWritten.get();
+ }
+
+ @Override
+ public double getTotalRecordsReceived() {
+ return _statTotalRecordsReceived.get();
+ }
+
+ @Override
+ public double getTotalSysExceptions() {
+ return _statTotalSysExceptions.get();
+ }
+
+ @Override
+ public double getTotalUserExceptions() {
+ return 0;
+ }
+
+ @Override
+ public double getLastInvocation() {
+ return _statlastInvocation.get();
+ }
+
+ @Override
+ public double getAvgProcessLatency() {
+ return 0;
+ }
+
+ @Override
+ public double getTotalProcessedSuccessfully1min() {
+ return _statTotalWritten1min.get();
+ }
+
+ @Override
+ public double getTotalRecordsReceived1min() {
+ return _statTotalRecordsReceived1min.get();
+ }
+
+ @Override
+ public double getTotalSysExceptions1min() {
+ return _statTotalSysExceptions1min.get();
+ }
+
+ @Override
+ public double getTotalUserExceptions1min() {
+ return 0;
+ }
+
+ @Override
+ public double getAvgProcessLatency1min() {
+ return 0;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestUserExceptions() {
+ return EvictingQueue.create(0);
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSystemExceptions() {
+ return EMPTY_QUEUE;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSourceExceptions() {
+ return EMPTY_QUEUE;
+ }
+
+ @Override
+ public
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation>
getLatestSinkExceptions() {
+ return EvictingQueue.create(0);
+ }
+}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 77427f7..278cd5a 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -39,6 +39,7 @@ import
org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import
org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+import org.apache.pulsar.functions.utils.Utils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
@@ -75,8 +76,8 @@ public class ContextImplTest {
logger,
client,
new ArrayList<>(),
- new EnvironmentBasedSecretsProvider(), new CollectorRegistry(),
new String[0]
- );
+ new EnvironmentBasedSecretsProvider(), new CollectorRegistry(),
new String[0],
+ Utils.ComponentType.FUNCTION);
}
@Test(expected = IllegalStateException.class)
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 1c5d64d..b50bcd5 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -364,4 +364,21 @@ public class Utils {
String functionName, int
instanceId) {
return String.format("%s/%s/%s:%d", tenant, namespace, functionName,
instanceId);
}
+
+ public enum ComponentType {
+ FUNCTION("Function"),
+ SOURCE("Source"),
+ SINK("Sink");
+
+ private final String componentName;
+
+ ComponentType(String componentName) {
+ this.componentName = componentName;
+ }
+
+ @Override
+ public String toString() {
+ return componentName;
+ }
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 0e4b1b1..50afdc6 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -72,9 +72,9 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.join;
import static org.apache.pulsar.functions.utils.Utils.*;
-import static
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.FUNCTION;
-import static
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SINK;
-import static
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SOURCE;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -122,23 +122,6 @@ import net.jodah.typetools.TypeResolver;
@Slf4j
public abstract class ComponentImpl {
- public enum ComponentType {
- FUNCTION("Function"),
- SOURCE("Source"),
- SINK("Sink");
-
- private final String componentName;
-
- ComponentType(String componentName) {
- this.componentName = componentName;
- }
-
- @Override
- public String toString() {
- return componentName;
- }
- }
-
private final AtomicReference<StorageClient> storageClient = new
AtomicReference<>();
protected final Supplier<WorkerService> workerServiceSupplier;
protected final ComponentType componentType;
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index f3e41a1..be8fd49 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -24,6 +24,7 @@ import
org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
@@ -199,7 +200,7 @@ public class FunctionsImpl extends ComponentImpl {
}
public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, ComponentType.FUNCTION);
+ super(workerServiceSupplier, Utils.ComponentType.FUNCTION);
}
/**
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 846900f..4a801b2 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
@@ -206,7 +207,7 @@ public class SinkImpl extends ComponentImpl {
}
public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, ComponentType.SINK);
+ super(workerServiceSupplier, Utils.ComponentType.SINK);
}
public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
getSinkInstanceStatus(final String tenant,
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index ac096bb..ec7ba30 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
@@ -204,7 +205,7 @@ public class SourceImpl extends ComponentImpl {
}
public SourceImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, ComponentType.SOURCE);
+ super(workerServiceSupplier, Utils.ComponentType.SOURCE);
}
public SourceStatus getSourceStatus(final String tenant,
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index aa22650..d8d54c9 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -76,6 +76,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
import static org.apache.pulsar.functions.utils.Utils.mergeJson;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
@@ -179,7 +180,7 @@ public class FunctionApiV2ResourceTest {
FunctionsImpl functions = spy(new FunctionsImpl(() ->
mockedWorkerService));
-
doReturn(ComponentImpl.ComponentType.FUNCTION).when(functions).calculateSubjectType(any());
+ doReturn(FUNCTION).when(functions).calculateSubjectType(any());
this.resource = spy(new FunctionsImplV2(functions));
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 580345e..7695993 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -76,6 +76,9 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@@ -177,7 +180,7 @@ public class FunctionApiV3ResourceTest {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
+ doReturn(FUNCTION).when(this.resource).calculateSubjectType(any());
}
//
@@ -1383,9 +1386,9 @@ public class FunctionApiV3ResourceTest {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant),
eq(namespace))).thenReturn(functionMetaDataList);
-
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+ doReturn(SOURCE).when(this.resource).calculateSubjectType(f1);
+ doReturn(FUNCTION).when(this.resource).calculateSubjectType(f2);
+ doReturn(SINK).when(this.resource).calculateSubjectType(f3);
List<String> functionList = listDefaultFunctions();
assertEquals(functions, functionList);
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index bec3ca9..9e55800 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -47,7 +47,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.RestException;
-import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
import org.apache.pulsar.io.cassandra.CassandraStringSink;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -175,7 +174,7 @@ public class SinkApiV3ResourceTest {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new SinkImpl(() -> mockedWorkerService));
-
Mockito.doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
+
Mockito.doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
}
//
@@ -1308,9 +1307,9 @@ public class SinkApiV3ResourceTest {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant),
eq(namespace))).thenReturn(functionMetaDataList);
-
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
List<String> sinkList = listDefaultSinks();
assertEquals(functions, sinkList);
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 72d23e2..6ba8914 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -50,7 +50,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.RestException;
-import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
import org.apache.pulsar.io.twitter.TwitterFireHose;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -170,7 +169,7 @@ public class SourceApiV3ResourceTest {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new SourceImpl(() -> mockedWorkerService));
-
Mockito.doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
+
Mockito.doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
}
//
@@ -1322,9 +1321,9 @@ public class SourceApiV3ResourceTest {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant),
eq(namespace))).thenReturn(functionMetaDataList);
-
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
List<String> sourceList = listDefaultSources();
assertEquals(functions, sourceList);
diff --git a/pulsar-io/data-genenator/pom.xml b/pulsar-io/data-generator/pom.xml
similarity index 100%
rename from pulsar-io/data-genenator/pom.xml
rename to pulsar-io/data-generator/pom.xml
diff --git
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
similarity index 100%
rename from
pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
rename to
pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
diff --git
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
similarity index 100%
rename from
pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
rename to
pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
diff --git
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
similarity index 100%
rename from
pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
rename to
pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
diff --git
a/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml
similarity index 100%
rename from
pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml
rename to
pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 628686b..ea433ce 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -42,7 +42,7 @@
<module>kinesis</module>
<module>hdfs3</module>
<module>jdbc</module>
- <module>data-genenator</module>
+ <module>data-generator</module>
<module>elastic-search</module>
<module>kafka-connect-adaptor</module>
<module>debezium</module>