This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7719b8e Make sure to properly count number of processed messages in
python (#3060)
7719b8e is described below
commit 7719b8ede5bb8d0e63ae51e22bb47d7b59c8b02d
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Tue Nov 27 10:51:29 2018 -0800
Make sure to properly count number of processed messages in python (#3060)
* Make sure to properly count number of processed messages in python
* Removed total processed
* Fixed build
* Fixed buil
* Address feedback
* Fixed unittest
* Removed unused value
* Added licence headers
* Removed unnecessary changes
* Fix integration tests
* Added numReceived as part of function status
* Unnecessary change revert
* Enhance test
---
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 4 +-
.../functions/instance/FunctionStatsManager.java | 32 ---------
.../functions/instance/JavaInstanceRunnable.java | 6 +-
.../instance/src/main/python/Function_pb2.py | 1 -
.../src/main/python/InstanceCommunication_pb2.py | 78 +++++++++-------------
.../instance/src/main/python/function_stats.py | 28 ++------
.../instance/src/main/python/python_instance.py | 13 +---
.../src/main/proto/InstanceCommunication.proto | 10 +--
.../functions/worker/FunctionsStatsGenerator.java | 1 -
.../worker/FunctionStatsGeneratorTest.java | 10 +--
.../integration/functions/PulsarFunctionsTest.java | 7 +-
11 files changed, 54 insertions(+), 136 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index f9491f3..9214d25 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -525,10 +525,10 @@ public class PulsarFunctionE2ETest {
FunctionStatus stats =
functionStatus.getFunctionStatusListList().get(0);
- double count = stats.getNumProcessed();
+ double count = stats.getNumReceived();
double success = stats.getNumSuccessfullyProcessed();
String ownerWorkerId = stats.getWorkerId();
- assertEquals((int) count, totalMsgs);
+ assertEquals((int)count, totalMsgs);
assertEquals((int) success, totalMsgs);
assertEquals(ownerWorkerId, workerId);
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index 4d66422..139208b 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -46,7 +46,6 @@ public class FunctionStatsManager implements AutoCloseable {
public final static String USER_METRIC_PREFIX = "user_metric_";
/** Declare metric names **/
- public static final String PROCESSED_TOTAL = "processed_total";
public static final String PROCESSED_SUCCESSFULLY_TOTAL =
"processed_successfully_total";
public static final String SYSTEM_EXCEPTIONS_TOTAL =
"system_exceptions_total";
public static final String USER_EXCEPTIONS_TOTAL = "user_exceptions_total";
@@ -54,7 +53,6 @@ public class FunctionStatsManager implements AutoCloseable {
public static final String LAST_INVOCATION = "last_invocation";
public static final String RECEIVED_TOTAL = "received_total";
- public static final String PROCESSED_TOTAL_1min = "processed_total_1min";
public static final String PROCESSED_SUCCESSFULLY_TOTAL_1min =
"processed_successfully_total_1min";
public static final String SYSTEM_EXCEPTIONS_TOTAL_1min =
"system_exceptions_total_1min";
public static final String USER_EXCEPTIONS_TOTAL_1min =
"user_exceptions_total_1min";
@@ -63,8 +61,6 @@ public class FunctionStatsManager implements AutoCloseable {
/** Declare Prometheus stats **/
- final Counter statTotalProcessed;
-
final Counter statTotalProcessedSuccessfully;
final Counter statTotalSysExceptions;
@@ -79,8 +75,6 @@ public class FunctionStatsManager implements AutoCloseable {
// windowed metrics
- final Counter statTotalProcessed1min;
-
final Counter statTotalProcessedSuccessfully1min;
final Counter statTotalSysExceptions1min;
@@ -104,12 +98,6 @@ public class FunctionStatsManager implements AutoCloseable {
this.metricsLabels = metricsLabels;
- statTotalProcessed = Counter.build()
- .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL)
- .help("Total number of messages processed.")
- .labelNames(metricsLabelNames)
- .register(collectorRegistry);
-
statTotalProcessedSuccessfully = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX +
PROCESSED_SUCCESSFULLY_TOTAL)
.help("Total number of messages processed successfully.")
@@ -150,12 +138,6 @@ public class FunctionStatsManager implements AutoCloseable
{
.labelNames(metricsLabelNames)
.register(collectorRegistry);
- statTotalProcessed1min = Counter.build()
- .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL_1min)
- .help("Total number of messages processed in the last 1
minute.")
- .labelNames(metricsLabelNames)
- .register(collectorRegistry);
-
statTotalProcessedSuccessfully1min = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX +
PROCESSED_SUCCESSFULLY_TOTAL_1min)
.help("Total number of messages processed successfully in the
last 1 minute.")
@@ -222,11 +204,6 @@ public class FunctionStatsManager implements AutoCloseable
{
statTotalRecordsRecieved1min.labels(metricsLabels).inc();
}
- public void incrTotalProcessed() {
- statTotalProcessed.labels(metricsLabels).inc();
- statTotalProcessed1min.labels(metricsLabels).inc();
- }
-
public void incrTotalProcessedSuccessfully() {
statTotalProcessedSuccessfully.labels(metricsLabels).inc();
statTotalProcessedSuccessfully1min.labels(metricsLabels).inc();
@@ -261,10 +238,6 @@ public class FunctionStatsManager implements AutoCloseable
{
}
}
- public double getTotalProcessed() {
- return statTotalProcessed.labels(metricsLabels).get();
- }
-
public double getTotalProcessedSuccessfully() {
return statTotalProcessedSuccessfully.labels(metricsLabels).get();
}
@@ -306,10 +279,6 @@ public class FunctionStatsManager implements AutoCloseable
{
return
statProcessLatency.labels(metricsLabels).get().quantiles.get(0.999);
}
- public double getTotalProcessed1min() {
- return statTotalProcessed1min.labels(metricsLabels).get();
- }
-
public double getTotalProcessedSuccessfully1min() {
return statTotalProcessedSuccessfully1min.labels(metricsLabels).get();
}
@@ -348,7 +317,6 @@ public class FunctionStatsManager implements AutoCloseable {
}
public void reset() {
- statTotalProcessed1min.clear();
statTotalProcessedSuccessfully1min.clear();
statTotalSysExceptions1min.clear();
statTotalUserExceptions1min.clear();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 72dbb9f..1d70356 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -255,8 +255,6 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
// register end time
stats.processTimeEnd();
- // increment total processed
- stats.incrTotalProcessed();
removeLogTopicHandler();
@@ -520,7 +518,6 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
private Builder createMetricsDataBuilder() {
InstanceCommunication.MetricsData.Builder bldr =
InstanceCommunication.MetricsData.newBuilder();
- bldr.setProcessedTotal((long) stats.getTotalProcessed());
bldr.setProcessedSuccessfullyTotal((long)
stats.getTotalProcessedSuccessfully());
bldr.setSystemExceptionsTotal((long) stats.getTotalSysExceptions());
bldr.setUserExceptionsTotal((long) stats.getTotalUserExceptions());
@@ -528,7 +525,6 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
bldr.setAvgProcessLatency(stats.getAvgProcessLatency());
bldr.setLastInvocation((long) stats.getLastInvocation());
- bldr.setProcessedTotal1Min((long) stats.getTotalProcessed1min());
bldr.setProcessedSuccessfullyTotal1Min((long)
stats.getTotalProcessedSuccessfully1min());
bldr.setSystemExceptionsTotal1Min((long)
stats.getTotalSysExceptions1min());
bldr.setUserExceptionsTotal1Min((long)
stats.getTotalUserExceptions1min());
@@ -540,7 +536,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder =
InstanceCommunication.FunctionStatus.newBuilder();
- functionStatusBuilder.setNumProcessed((long)
stats.getTotalProcessed());
+
functionStatusBuilder.setNumReceived((long)stats.getTotalRecordsReceived());
functionStatusBuilder.setNumSuccessfullyProcessed((long)
stats.getTotalProcessedSuccessfully());
functionStatusBuilder.setNumUserExceptions((long)
stats.getTotalUserExceptions());
stats.getLatestUserExceptions().forEach(ex -> {
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index d6cc8af..3979d49 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -18,7 +18,6 @@
#
# Generated by the protocol buffer compiler. DO NOT EDIT!
-# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: Function.proto
import sys
diff --git
a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index f88e6a3..2f7b376 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='InstanceCommunication.proto',
package='proto',
syntax='proto3',
-
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xb3\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
\x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12
\n\x18numSuccessfullyProcessed\x18\x05
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
+
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xb2\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
\x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03
\x01(\x03\x12\x13\n\x0bnumReceived\x18\x11 \x01(\x03\x12
\n\x18numSuccessfullyProcessed\x18\x05
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07
\x03(\x0b\x32*.proto.FunctionStatus.Exc [...]
,
dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
@@ -79,8 +79,8 @@ _FUNCTIONSTATUS_EXCEPTIONINFORMATION = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=624,
- serialized_end=693,
+ serialized_start=623,
+ serialized_end=692,
)
_FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor(
@@ -116,8 +116,8 @@ _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=695,
- serialized_end=759,
+ serialized_start=694,
+ serialized_end=758,
)
_FUNCTIONSTATUS = _descriptor.Descriptor(
@@ -149,8 +149,8 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='numProcessed', full_name='proto.FunctionStatus.numProcessed',
index=3,
- number=4, type=3, cpp_type=2, label=1,
+ name='numReceived', full_name='proto.FunctionStatus.numReceived',
index=3,
+ number=17, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
@@ -245,7 +245,7 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
oneofs=[
],
serialized_start=68,
- serialized_end=759,
+ serialized_end=758,
)
@@ -282,8 +282,8 @@ _FUNCTIONSTATUSLIST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=761,
- serialized_end=847,
+ serialized_start=760,
+ serialized_end=846,
)
@@ -320,8 +320,8 @@ _METRICSDATA_USERMETRICSENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1317,
- serialized_end=1367,
+ serialized_start=1263,
+ serialized_end=1313,
)
_METRICSDATA = _descriptor.Descriptor(
@@ -346,84 +346,70 @@ _METRICSDATA = _descriptor.Descriptor(
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='processedTotal', full_name='proto.MetricsData.processedTotal',
index=2,
- number=3, type=3, cpp_type=2, label=1,
- has_default_value=False, default_value=0,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
- _descriptor.FieldDescriptor(
- name='processedTotal_1min',
full_name='proto.MetricsData.processedTotal_1min', index=3,
- number=11, type=3, cpp_type=2, label=1,
- has_default_value=False, default_value=0,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
- _descriptor.FieldDescriptor(
- name='processedSuccessfullyTotal',
full_name='proto.MetricsData.processedSuccessfullyTotal', index=4,
+ name='processedSuccessfullyTotal',
full_name='proto.MetricsData.processedSuccessfullyTotal', index=2,
number=4, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='processedSuccessfullyTotal_1min',
full_name='proto.MetricsData.processedSuccessfullyTotal_1min', index=5,
+ name='processedSuccessfullyTotal_1min',
full_name='proto.MetricsData.processedSuccessfullyTotal_1min', index=3,
number=12, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='systemExceptionsTotal',
full_name='proto.MetricsData.systemExceptionsTotal', index=6,
+ name='systemExceptionsTotal',
full_name='proto.MetricsData.systemExceptionsTotal', index=4,
number=5, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='systemExceptionsTotal_1min',
full_name='proto.MetricsData.systemExceptionsTotal_1min', index=7,
+ name='systemExceptionsTotal_1min',
full_name='proto.MetricsData.systemExceptionsTotal_1min', index=5,
number=13, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='userExceptionsTotal',
full_name='proto.MetricsData.userExceptionsTotal', index=8,
+ name='userExceptionsTotal',
full_name='proto.MetricsData.userExceptionsTotal', index=6,
number=6, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='userExceptionsTotal_1min',
full_name='proto.MetricsData.userExceptionsTotal_1min', index=9,
+ name='userExceptionsTotal_1min',
full_name='proto.MetricsData.userExceptionsTotal_1min', index=7,
number=14, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='avgProcessLatency',
full_name='proto.MetricsData.avgProcessLatency', index=10,
+ name='avgProcessLatency',
full_name='proto.MetricsData.avgProcessLatency', index=8,
number=7, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='avgProcessLatency_1min',
full_name='proto.MetricsData.avgProcessLatency_1min', index=11,
+ name='avgProcessLatency_1min',
full_name='proto.MetricsData.avgProcessLatency_1min', index=9,
number=15, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='lastInvocation', full_name='proto.MetricsData.lastInvocation',
index=12,
+ name='lastInvocation', full_name='proto.MetricsData.lastInvocation',
index=10,
number=8, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='userMetrics', full_name='proto.MetricsData.userMetrics', index=13,
+ name='userMetrics', full_name='proto.MetricsData.userMetrics', index=11,
number=9, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
@@ -441,8 +427,8 @@ _METRICSDATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=850,
- serialized_end=1367,
+ serialized_start=849,
+ serialized_end=1313,
)
@@ -472,8 +458,8 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1369,
- serialized_end=1405,
+ serialized_start=1315,
+ serialized_end=1351,
)
@@ -517,8 +503,8 @@ _METRICS_INSTANCEMETRICS = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1468,
- serialized_end=1560,
+ serialized_start=1414,
+ serialized_end=1506,
)
_METRICS = _descriptor.Descriptor(
@@ -547,8 +533,8 @@ _METRICS = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1408,
- serialized_end=1560,
+ serialized_start=1354,
+ serialized_end=1506,
)
_FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
@@ -650,8 +636,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
file=DESCRIPTOR,
index=0,
options=None,
- serialized_start=1563,
- serialized_end=1911,
+ serialized_start=1509,
+ serialized_end=1857,
methods=[
_descriptor.MethodDescriptor(
name='GetFunctionStatus',
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py
b/pulsar-functions/instance/src/main/python/function_stats.py
index f720a41..3d6e216 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -30,7 +30,6 @@ class Stats(object):
PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_"
USER_METRIC_PREFIX = "user_metric_";
- TOTAL_PROCESSED = 'processed_total'
TOTAL_SUCCESSFULLY_PROCESSED = 'processed_successfully_total'
TOTAL_SYSTEM_EXCEPTIONS = 'system_exceptions_total'
TOTAL_USER_EXCEPTIONS = 'user_exceptions_total'
@@ -38,7 +37,6 @@ class Stats(object):
LAST_INVOCATION = 'last_invocation'
TOTAL_RECEIVED = 'received_total'
- TOTAL_PROCESSED_1min = 'processed_total_1min'
TOTAL_SUCCESSFULLY_PROCESSED_1min = 'processed_successfully_total_1min'
TOTAL_SYSTEM_EXCEPTIONS_1min = 'system_exceptions_total_1min'
TOTAL_USER_EXCEPTIONS_1min = 'user_exceptions_total_1min'
@@ -46,7 +44,6 @@ class Stats(object):
TOTAL_RECEIVED_1min = 'received_total_1min'
# Declare Prometheus
- stat_total_processed = Counter(PULSAR_FUNCTION_METRICS_PREFIX +
TOTAL_PROCESSED, 'Total number of messages processed.', metrics_label_names)
stat_total_processed_successfully = Counter(PULSAR_FUNCTION_METRICS_PREFIX +
TOTAL_SUCCESSFULLY_PROCESSED,
'Total number of messages
processed successfully.', metrics_label_names)
stat_total_sys_exceptions = Counter(PULSAR_FUNCTION_METRICS_PREFIX+
TOTAL_SYSTEM_EXCEPTIONS, 'Total number of system exceptions.',
@@ -62,8 +59,6 @@ class Stats(object):
# 1min windowed metrics
- stat_total_processed_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX +
TOTAL_PROCESSED_1min,
- 'Total number of messages processed in the
last 1 minute.', metrics_label_names)
stat_total_processed_successfully_1min =
Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SUCCESSFULLY_PROCESSED_1min,
'Total number of messages
processed successfully in the last 1 minute.', metrics_label_names)
stat_total_sys_exceptions_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX +
TOTAL_SYSTEM_EXCEPTIONS_1min,
@@ -92,9 +87,6 @@ class Stats(object):
def get_total_received(self):
return self.stat_total_received.labels(*self.metrics_labels)._value.get();
- def get_total_processed(self):
- return self.stat_total_processed.labels(*self.metrics_labels)._value.get();
-
def get_total_processed_successfully(self):
return
self.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get();
@@ -111,20 +103,17 @@ class Stats(object):
if process_latency_ms_count <= 0.0 \
else process_latency_ms_sum / process_latency_ms_count
- def get_total_received_1min(self):
- return
self.stat_total_received_1min.labels(*self.metrics_labels)._value.get();
-
- def get_total_processed_1min(self):
- return
self.stat_total_processed_1min.labels(*self.metrics_labels)._value.get();
-
def get_total_processed_successfully_1min(self):
- return
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.get();
+ return
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.get()
def get_total_sys_exceptions_1min(self):
- return
self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.get();
+ return
self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.get()
def get_total_user_exceptions_1min(self):
- return
self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.get();
+ return
self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.get()
+
+ def get_total_received_1min(self):
+ return
self.stat_total_received_1min.labels(*self.metrics_labels)._value.get()
def get_avg_process_latency_1min(self):
process_latency_ms_count =
self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._count.get()
@@ -136,10 +125,6 @@ class Stats(object):
def get_last_invocation(self):
return self.stat_last_invocation.labels(*self.metrics_labels)._value.get()
- def incr_total_processed(self):
- self.stat_total_processed.labels(*self.metrics_labels).inc()
- self.stat_total_processed_1min.labels(*self.metrics_labels).inc()
-
def incr_total_processed_successfully(self):
self.stat_total_processed_successfully.labels(*self.metrics_labels).inc()
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels).inc()
@@ -183,7 +168,6 @@ class Stats(object):
def reset(self):
self.latest_user_exception = []
self.latest_sys_exception = []
- self.stat_total_processed_1min.labels(*self.metrics_labels)._value.set(0.0)
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.set(0.0)
self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py
b/pulsar-functions/instance/src/main/python/python_instance.py
index ec85af1..0fc3601 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -211,12 +211,9 @@ class PythonInstance(object):
# stop timer for process time
self.stats.process_time_end()
-
- # incr total processed stat
- self.stats.incr_total_processed()
except Exception as e:
Log.exception("Exception while executing user method")
- self.stats.incr_total_user_exceptions();
+ self.stats.incr_total_user_exceptions()
if self.log_topic_handler is not None:
log.remove_all_handlers()
@@ -292,7 +289,6 @@ class PythonInstance(object):
def get_metrics(self):
total_received = self.stats.get_total_received()
- total_processed = self.stats.get_total_processed()
total_processed_successfully =
self.stats.get_total_processed_successfully()
total_user_exceptions = self.stats.get_total_user_exceptions()
total_sys_exceptions = self.stats.get_total_sys_exceptions()
@@ -300,7 +296,6 @@ class PythonInstance(object):
last_invocation = self.stats.get_last_invocation()
total_received_1min = self.stats.get_total_received_1min()
- total_processed_1min = self.stats.get_total_processed_1min()
total_processed_successfully_1min =
self.stats.get_total_processed_successfully_1min()
total_user_exceptions_1min = self.stats.get_total_user_exceptions_1min()
total_sys_exceptions_1min = self.stats.get_total_sys_exceptions_1min()
@@ -309,7 +304,6 @@ class PythonInstance(object):
metrics_data = InstanceCommunication_pb2.MetricsData()
# total metrics
metrics_data.receivedTotal = int(total_received) if sys.version_info.major
>= 3 else long(total_received)
- metrics_data.processedTotal = int(total_processed) if
sys.version_info.major >= 3 else long(total_processed)
metrics_data.processedSuccessfullyTotal =
int(total_processed_successfully) if sys.version_info.major >= 3 else
long(total_processed_successfully)
metrics_data.systemExceptionsTotal = int(total_sys_exceptions) if
sys.version_info.major >= 3 else long(total_sys_exceptions)
metrics_data.userExceptionsTotal = int(total_user_exceptions) if
sys.version_info.major >= 3 else long(total_user_exceptions)
@@ -317,7 +311,6 @@ class PythonInstance(object):
metrics_data.lastInvocation = int(last_invocation) if
sys.version_info.major >= 3 else long(last_invocation)
# 1min metrics
metrics_data.receivedTotal_1min = int(total_received_1min) if
sys.version_info.major >= 3 else long(total_received_1min)
- metrics_data.processedTotal_1min = int(total_processed_1min) if
sys.version_info.major >= 3 else long(total_processed_1min)
metrics_data.processedSuccessfullyTotal_1min = int(
total_processed_successfully_1min) if sys.version_info.major >= 3 else
long(total_processed_successfully_1min)
metrics_data.systemExceptionsTotal_1min = int(total_sys_exceptions_1min)
if sys.version_info.major >= 3 else long(
@@ -343,14 +336,14 @@ class PythonInstance(object):
status = InstanceCommunication_pb2.FunctionStatus()
status.running = True
- total_processed = self.stats.get_total_processed()
+ total_received = self.stats.get_total_received()
total_processed_successfully =
self.stats.get_total_processed_successfully()
total_user_exceptions = self.stats.get_total_user_exceptions()
total_sys_exceptions = self.stats.get_total_sys_exceptions()
avg_process_latency_ms = self.stats.get_avg_process_latency()
last_invocation = self.stats.get_last_invocation()
- status.numProcessed = int(total_processed) if sys.version_info.major >= 3
else long(total_processed)
+ status.numReceived = int(total_received) if sys.version_info.major >= 3
else long(total_received)
status.numSuccessfullyProcessed = int(total_processed_successfully) if
sys.version_info.major >= 3 else long(total_processed_successfully)
status.numUserExceptions = int(total_user_exceptions) if
sys.version_info.major >= 3 else long(total_user_exceptions)
status.instanceId = self.instance_config.instance_id
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 05d1399..e5359f8 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -32,7 +32,8 @@ message FunctionStatus {
bool running = 1;
string failureException = 2;
int64 numRestarts = 3;
- int64 numProcessed = 4;
+ // int64 numProcessed = 4;
+ int64 numReceived = 17;
int64 numSuccessfullyProcessed = 5;
int64 numUserExceptions = 6;
repeated ExceptionInformation latestUserExceptions = 7;
@@ -74,9 +75,10 @@ message MetricsData {
int64 receivedTotal_1min = 10;
// Total number of records processed
- int64 processedTotal = 3;
+ // No longer used because processedSuccessfullyTotal and
userExceptionsTotal add to it
+ // int64 processedTotal = 3;
- int64 processedTotal_1min = 11;
+ // int64 processedTotal_1min = 11;
// Total number of records successfully processed by user function
int64 processedSuccessfullyTotal = 4;
@@ -124,4 +126,4 @@ message Metrics {
MetricsData metricsData = 3;
}
repeated InstanceMetrics metrics = 1;
-}
\ No newline at end of file
+}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 77228ac..2b8e0fd 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -71,7 +71,6 @@ public class FunctionsStatsGenerator {
metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX +
FunctionStatsManager.PROCESS_LATENCY_MS, instanceId,
metrics.getAvgProcessLatency());
metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX +
FunctionStatsManager.LAST_INVOCATION, instanceId, metrics.getLastInvocation());
metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX +
FunctionStatsManager.PROCESSED_SUCCESSFULLY_TOTAL, instanceId,
metrics.getProcessedSuccessfullyTotal());
- metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX +
FunctionStatsManager.PROCESSED_TOTAL, instanceId, metrics.getProcessedTotal());
metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX +
FunctionStatsManager.RECEIVED_TOTAL, instanceId, metrics.getReceivedTotal());
metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX +
FunctionStatsManager.SYSTEM_EXCEPTIONS_TOTAL, instanceId,
metrics.getSystemExceptionsTotal());
metric(out, cluster, qualifiedNamespace, name,
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX +
FunctionStatsManager.USER_EXCEPTIONS_TOTAL, instanceId,
metrics.getUserExceptionsTotal());
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index efc2974..4b54bcf 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -88,7 +88,6 @@ public class FunctionStatsGeneratorTest {
CompletableFuture<InstanceCommunication.MetricsData>
metricsDataCompletableFuture = new CompletableFuture<>();
InstanceCommunication.MetricsData metricsData =
InstanceCommunication.MetricsData.newBuilder()
.setReceivedTotal(101)
- .setProcessedTotal(100)
.setProcessedSuccessfullyTotal(99)
.setAvgProcessLatency(10.0)
.setUserExceptionsTotal(3)
@@ -126,7 +125,7 @@ public class FunctionStatsGeneratorTest {
buf.release();
Map<String, Metric> metrics = parseMetrics(str);
- Assert.assertEquals(metrics.size(), 7);
+ Assert.assertEquals(metrics.size(), 6);
System.out.println("metrics: " + metrics);
Metric m = metrics.get("pulsar_function_received_total");
@@ -136,13 +135,6 @@ public class FunctionStatsGeneratorTest {
assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
assertEquals(m.value, 101.0);
- m = metrics.get("pulsar_function_processed_total");
- assertEquals(m.tags.get("cluster"), "default");
- assertEquals(m.tags.get("instanceId"), "0");
- assertEquals(m.tags.get("name"), "func-1");
- assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
- assertEquals(m.value, 100.0);
-
m = metrics.get("pulsar_function_user_exceptions_total");
assertEquals(m.tags.get("cluster"), "default");
assertEquals(m.tags.get("instanceId"), "0");
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 0bfeb90..e72963e 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -333,7 +333,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
try {
ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get sink status : {}", result.getStdout());
- if (result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
+ if (result.getStdout().contains("\"numSuccessfullyProcessed\":
\"" + numMessages + "\"")) {
return;
}
} catch (ContainerExecException e) {
@@ -532,7 +532,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
try {
ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get source status : {}", result.getStdout());
- if (result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
+ if (result.getStdout().contains("\"numSuccessfullyProcessed\":
\"" + numMessages + "\"")) {
return;
}
} catch (ContainerExecException e) {
@@ -561,7 +561,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
try {
ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get sink status : {}", result.getStdout());
- if (result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
+ if (result.getStdout().contains("\"numSuccessfullyProcessed\":
\"" + numMessages + "\"")) {
return;
}
} catch (ContainerExecException e) {
@@ -913,7 +913,6 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
"--name", functionName
);
assertTrue(result.getStdout().contains("\"running\": true"));
- assertTrue(result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\""));
assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\":
\"" + numMessages + "\""));
}