This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5f779b4 Add function metrics with function-stats to get metrics
on-demand (#2130)
5f779b4 is described below
commit 5f779b4ce541e1201354ca5454bebf1276e93f0b
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Jul 16 14:00:07 2018 -0700
Add function metrics with function-stats to get metrics on-demand (#2130)
---
.../org/apache/pulsar/io/PulsarSinkE2ETest.java | 84 ++++++++++++++++++++--
.../functions/instance/JavaInstanceRunnable.java | 22 ++++--
.../src/main/python/InstanceCommunication_pb2.py | 44 +++++++-----
.../instance/src/main/python/python_instance.py | 1 +
.../src/main/proto/InstanceCommunication.proto | 1 +
.../pulsar/functions/runtime/ThreadRuntime.java | 13 +++-
6 files changed, 133 insertions(+), 32 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 5d70525..3ece269 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -48,22 +48,26 @@ import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
+import
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
@@ -244,6 +248,7 @@ public class PulsarSinkE2ETest {
final String sinkTopic = "persistent://" + replNamespace + "/output";
final String propertyKey = "key";
final String propertyValue = "value";
+ final String functionName = "PulsarSink-test";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
@@ -254,7 +259,7 @@ public class PulsarSinkE2ETest {
String jarFilePathUrl = Utils.FILE + ":"
+
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl,
tenant, namespacePortion, "PulsarSink-test",
+ FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl,
tenant, namespacePortion, functionName,
sinkTopic);
admin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
@@ -298,7 +303,76 @@ public class PulsarSinkE2ETest {
}
- protected FunctionDetails createSinkConfig(String jarFile, String tenant,
String namespace, String sinkName, String sinkTopic) {
+
+ @Test(timeOut = 20000)
+ public void testPulsarSinkStats() throws Exception {
+
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopic = "persistent://" + replNamespace +
"/my-topic1";
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
+ final String functionName = "PulsarSink-test";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
+
+ // create a producer that creates a topic at broker
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(sourceTopic).create();
+
+ String jarFilePathUrl = Utils.FILE + ":"
+ +
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+ FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl,
tenant, namespacePortion, functionName,
+ sinkTopic);
+ admin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
+
+ // try to update function to test: update-function functionality
+ admin.functions().updateFunctionWithUrl(functionDetails,
jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ return
admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ // validate pulsar sink consumer has started on the topic
+
Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(),
1);
+
+ int totalMsgs = 10;
+ for (int i = 0; i < totalMsgs; i++) {
+ String data = "my-message-" + i;
+ producer.newMessage().property(propertyKey,
propertyValue).value(data.getBytes()).send();
+ }
+ retryStrategically((test) -> {
+ try {
+ SubscriptionStats subStats =
admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
+ .next();
+ return subStats.unackedMessages == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 500);
+
+ FunctionRuntimeManager functionRuntimeManager =
functionsWorkerService.getFunctionRuntimeManager();
+ functionRuntimeManager.updateRates();
+ FunctionStatusList functionStats =
functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion,
+ functionName);
+
+ int numInstances = functionStats.getFunctionStatusListCount();
+ Assert.assertEquals(numInstances, 1);
+
+ FunctionStatus stats =
functionStats.getFunctionStatusListList().get(0);
+ Map<String, DataDigest> metricsData =
stats.getMetrics().getMetricsMap();
+
+ double count =
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount();
+ double success =
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount();
+ Assert.assertEquals((int) count, totalMsgs);
+ Assert.assertEquals((int) success, totalMsgs);
+ }
+
+ protected FunctionDetails createSinkConfig(String jarFile, String tenant,
String namespace, String functionName, String sinkTopic) {
File file = new File(jarFile);
try {
@@ -312,7 +386,7 @@ public class PulsarSinkE2ETest {
FunctionDetails.Builder functionDetailsBuilder =
FunctionDetails.newBuilder();
functionDetailsBuilder.setTenant(tenant);
functionDetailsBuilder.setNamespace(namespace);
- functionDetailsBuilder.setName(sinkName);
+ functionDetailsBuilder.setName(functionName);
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
functionDetailsBuilder.setParallelism(1);
functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4c29232..33006b2 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -108,6 +108,14 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
private Source source;
private Sink sink;
+
+ public static final String METRICS_TOTAL_PROCESSED = "__total_processed__";
+ public static final String METRICS_TOTAL_SUCCESS =
"__total_successfully_processed__";
+ public static final String METRICS_TOTAL_SYS_EXCEPTION =
"__total_system_exceptions__";
+ public static final String METRICS_TOTAL_USER_EXCEPTION =
"__total_user_exceptions__";
+ public static final String METRICS_TOTAL_DESERIALIZATION_EXCEPTION =
"__total_deserialization_exceptions__";
+ public static final String METRICS_TOTAL_SERIALIZATION_EXCEPTION =
"__total_serialization_exceptions__";
+ public static final String METRICS_AVG_LATENCY = "__avg_latency_ms__";
public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
@@ -418,17 +426,17 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
private Builder createMetricsDataBuilder() {
InstanceCommunication.MetricsData.Builder bldr =
InstanceCommunication.MetricsData.newBuilder();
- addSystemMetrics("__total_processed__",
stats.getStats().getTotalProcessed(), bldr);
- addSystemMetrics("__total_successfully_processed__",
stats.getStats().getTotalSuccessfullyProcessed(),
+ addSystemMetrics(METRICS_TOTAL_PROCESSED,
stats.getStats().getTotalProcessed(), bldr);
+ addSystemMetrics(METRICS_TOTAL_SUCCESS,
stats.getStats().getTotalSuccessfullyProcessed(),
bldr);
- addSystemMetrics("__total_system_exceptions__",
stats.getStats().getTotalSystemExceptions(), bldr);
- addSystemMetrics("__total_user_exceptions__",
stats.getStats().getTotalUserExceptions(), bldr);
+ addSystemMetrics(METRICS_TOTAL_SYS_EXCEPTION,
stats.getStats().getTotalSystemExceptions(), bldr);
+ addSystemMetrics(METRICS_TOTAL_USER_EXCEPTION,
stats.getStats().getTotalUserExceptions(), bldr);
stats.getStats().getTotalDeserializationExceptions().forEach((topic,
count) -> {
- addSystemMetrics("__total_deserialization_exceptions__" + topic,
count, bldr);
+ addSystemMetrics(METRICS_TOTAL_DESERIALIZATION_EXCEPTION + topic,
count, bldr);
});
- addSystemMetrics("__total_serialization_exceptions__",
+ addSystemMetrics(METRICS_TOTAL_SERIALIZATION_EXCEPTION,
stats.getStats().getTotalSerializationExceptions(), bldr);
- addSystemMetrics("__avg_latency_ms__",
stats.getStats().computeLatency(), bldr);
+ addSystemMetrics(METRICS_AVG_LATENCY,
stats.getStats().computeLatency(), bldr);
return bldr;
}
diff --git
a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index 49c77d6..a16399e 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='InstanceCommunication.proto',
package='proto',
syntax='proto3',
-
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
\x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12
\n\x18numSuccessfullyProcessed\x18\x05
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
+
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xc6\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
\x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12
\n\x18numSuccessfullyProcessed\x18\x05
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
,
dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
@@ -79,8 +79,8 @@ _FUNCTIONSTATUS_EXCEPTIONINFORMATION = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=606,
- serialized_end=675,
+ serialized_start=643,
+ serialized_end=712,
)
_FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor(
@@ -116,8 +116,8 @@ _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=677,
- serialized_end=741,
+ serialized_start=714,
+ serialized_end=778,
)
_FUNCTIONSTATUS = _descriptor.Descriptor(
@@ -225,6 +225,13 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='metrics', full_name='proto.FunctionStatus.metrics', index=14,
+ number=15, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
],
extensions=[
],
@@ -238,7 +245,7 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
oneofs=[
],
serialized_start=68,
- serialized_end=741,
+ serialized_end=778,
)
@@ -268,8 +275,8 @@ _FUNCTIONSTATUSLIST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=743,
- serialized_end=814,
+ serialized_start=780,
+ serialized_end=851,
)
@@ -320,8 +327,8 @@ _METRICSDATA_DATADIGEST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=882,
- serialized_end=948,
+ serialized_start=919,
+ serialized_end=985,
)
_METRICSDATA_METRICSENTRY = _descriptor.Descriptor(
@@ -357,8 +364,8 @@ _METRICSDATA_METRICSENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=950,
- serialized_end=1027,
+ serialized_start=987,
+ serialized_end=1064,
)
_METRICSDATA = _descriptor.Descriptor(
@@ -387,8 +394,8 @@ _METRICSDATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=817,
- serialized_end=1027,
+ serialized_start=854,
+ serialized_end=1064,
)
@@ -418,8 +425,8 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1029,
- serialized_end=1065,
+ serialized_start=1066,
+ serialized_end=1102,
)
_FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
@@ -427,6 +434,7 @@
_FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY.containing_type = _FUNCTIONSTATUS
_FUNCTIONSTATUS.fields_by_name['latestUserExceptions'].message_type =
_FUNCTIONSTATUS_EXCEPTIONINFORMATION
_FUNCTIONSTATUS.fields_by_name['latestSystemExceptions'].message_type =
_FUNCTIONSTATUS_EXCEPTIONINFORMATION
_FUNCTIONSTATUS.fields_by_name['deserializationExceptions'].message_type =
_FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY
+_FUNCTIONSTATUS.fields_by_name['metrics'].message_type = _METRICSDATA
_FUNCTIONSTATUSLIST.fields_by_name['functionStatusList'].message_type =
_FUNCTIONSTATUS
_METRICSDATA_DATADIGEST.containing_type = _METRICSDATA
_METRICSDATA_METRICSENTRY.fields_by_name['value'].message_type =
_METRICSDATA_DATADIGEST
@@ -512,8 +520,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
file=DESCRIPTOR,
index=0,
options=None,
- serialized_start=1068,
- serialized_end=1416,
+ serialized_start=1105,
+ serialized_end=1453,
methods=[
_descriptor.MethodDescriptor(
name='GetFunctionStatus',
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py
b/pulsar-functions/instance/src/main/python/python_instance.py
index 70b78f0..bb16253 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -346,6 +346,7 @@ class PythonInstance(object):
status.serializationExceptions = self.total_stats.nserialization_exceptions
status.averageLatency = self.total_stats.compute_latency()
status.lastInvocationTime = self.total_stats.lastinvocationtime
+ status.metrics.CopyFrom(self.get_metrics())
return status
def join(self):
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index ed8c95a..1a80f5e 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -49,6 +49,7 @@ message FunctionStatus {
// expressed in ms since epoch
int64 lastInvocationTime = 13;
string instanceId = 14;
+ MetricsData metrics = 15;
}
message FunctionStatusList {
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 3b53fb8..5cb8ce0 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -98,15 +98,24 @@ class ThreadRuntime implements Runtime {
@Override
public CompletableFuture<FunctionStatus> getFunctionStatus() {
+ CompletableFuture<FunctionStatus> statsFuture = new
CompletableFuture<>();
if (!isAlive()) {
FunctionStatus.Builder functionStatusBuilder =
FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
functionStatusBuilder.setFailureException(getDeathException().getMessage());
- return
CompletableFuture.completedFuture(functionStatusBuilder.build());
+ statsFuture.complete(functionStatusBuilder.build());
+ return statsFuture;
}
FunctionStatus.Builder functionStatusBuilder =
javaInstanceRunnable.getFunctionStatus();
functionStatusBuilder.setRunning(true);
- return
CompletableFuture.completedFuture(functionStatusBuilder.build());
+ getMetrics().handle((metrics, e) -> {
+ if (e == null) {
+ functionStatusBuilder.setMetrics(metrics);
+ }
+ statsFuture.complete(functionStatusBuilder.build());
+ return null;
+ });
+ return statsFuture;
}
@Override