This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7719b8e  Make sure to properly count number of processed messages in 
python (#3060)
7719b8e is described below

commit 7719b8ede5bb8d0e63ae51e22bb47d7b59c8b02d
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Tue Nov 27 10:51:29 2018 -0800

    Make sure to properly count number of processed messages in python (#3060)
    
    * Make sure to properly count number of processed messages in python
    
    * Removed total processed
    
    * Fixed build
    
    * Fixed buil
    
    * Address feedback
    
    * Fixed unittest
    
    * Removed unused value
    
    * Added licence headers
    
    * Removed unnecessary changes
    
    * Fix integration tests
    
    * Added numReceived as part of function status
    
    * Unnecessary change revert
    
    * Enhance test
---
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    |  4 +-
 .../functions/instance/FunctionStatsManager.java   | 32 ---------
 .../functions/instance/JavaInstanceRunnable.java   |  6 +-
 .../instance/src/main/python/Function_pb2.py       |  1 -
 .../src/main/python/InstanceCommunication_pb2.py   | 78 +++++++++-------------
 .../instance/src/main/python/function_stats.py     | 28 ++------
 .../instance/src/main/python/python_instance.py    | 13 +---
 .../src/main/proto/InstanceCommunication.proto     | 10 +--
 .../functions/worker/FunctionsStatsGenerator.java  |  1 -
 .../worker/FunctionStatsGeneratorTest.java         | 10 +--
 .../integration/functions/PulsarFunctionsTest.java |  7 +-
 11 files changed, 54 insertions(+), 136 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index f9491f3..9214d25 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -525,10 +525,10 @@ public class PulsarFunctionE2ETest {
 
         FunctionStatus stats = 
functionStatus.getFunctionStatusListList().get(0);
 
-        double count = stats.getNumProcessed();
+        double count = stats.getNumReceived();
         double success = stats.getNumSuccessfullyProcessed();
         String ownerWorkerId = stats.getWorkerId();
-        assertEquals((int) count, totalMsgs);
+        assertEquals((int)count, totalMsgs);
         assertEquals((int) success, totalMsgs);
         assertEquals(ownerWorkerId, workerId);
     }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index 4d66422..139208b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -46,7 +46,6 @@ public class FunctionStatsManager implements AutoCloseable {
     public final static String USER_METRIC_PREFIX = "user_metric_";
 
     /** Declare metric names **/
-    public static final String PROCESSED_TOTAL = "processed_total";
     public static final String PROCESSED_SUCCESSFULLY_TOTAL = 
"processed_successfully_total";
     public static final String SYSTEM_EXCEPTIONS_TOTAL = 
"system_exceptions_total";
     public static final String USER_EXCEPTIONS_TOTAL = "user_exceptions_total";
@@ -54,7 +53,6 @@ public class FunctionStatsManager implements AutoCloseable {
     public static final String LAST_INVOCATION = "last_invocation";
     public static final String RECEIVED_TOTAL = "received_total";
 
-    public static final String PROCESSED_TOTAL_1min = "processed_total_1min";
     public static final String PROCESSED_SUCCESSFULLY_TOTAL_1min = 
"processed_successfully_total_1min";
     public static final String SYSTEM_EXCEPTIONS_TOTAL_1min = 
"system_exceptions_total_1min";
     public static final String USER_EXCEPTIONS_TOTAL_1min = 
"user_exceptions_total_1min";
@@ -63,8 +61,6 @@ public class FunctionStatsManager implements AutoCloseable {
 
     /** Declare Prometheus stats **/
 
-    final Counter statTotalProcessed;
-
     final Counter statTotalProcessedSuccessfully;
 
     final Counter statTotalSysExceptions;
@@ -79,8 +75,6 @@ public class FunctionStatsManager implements AutoCloseable {
     
     // windowed metrics
 
-    final Counter statTotalProcessed1min;
-
     final Counter statTotalProcessedSuccessfully1min;
 
     final Counter statTotalSysExceptions1min;
@@ -104,12 +98,6 @@ public class FunctionStatsManager implements AutoCloseable {
 
         this.metricsLabels = metricsLabels;
 
-        statTotalProcessed = Counter.build()
-                .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL)
-                .help("Total number of messages processed.")
-                .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
-
         statTotalProcessedSuccessfully = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + 
PROCESSED_SUCCESSFULLY_TOTAL)
                 .help("Total number of messages processed successfully.")
@@ -150,12 +138,6 @@ public class FunctionStatsManager implements AutoCloseable 
{
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
 
-        statTotalProcessed1min = Counter.build()
-                .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL_1min)
-                .help("Total number of messages processed in the last 1 
minute.")
-                .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
-
         statTotalProcessedSuccessfully1min = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + 
PROCESSED_SUCCESSFULLY_TOTAL_1min)
                 .help("Total number of messages processed successfully in the 
last 1 minute.")
@@ -222,11 +204,6 @@ public class FunctionStatsManager implements AutoCloseable 
{
         statTotalRecordsRecieved1min.labels(metricsLabels).inc();
     }
 
-    public void incrTotalProcessed() {
-        statTotalProcessed.labels(metricsLabels).inc();
-        statTotalProcessed1min.labels(metricsLabels).inc();
-    }
-
     public void incrTotalProcessedSuccessfully() {
         statTotalProcessedSuccessfully.labels(metricsLabels).inc();
         statTotalProcessedSuccessfully1min.labels(metricsLabels).inc();
@@ -261,10 +238,6 @@ public class FunctionStatsManager implements AutoCloseable 
{
         }
     }
 
-    public double getTotalProcessed() {
-        return statTotalProcessed.labels(metricsLabels).get();
-    }
-
     public double getTotalProcessedSuccessfully() {
         return statTotalProcessedSuccessfully.labels(metricsLabels).get();
     }
@@ -306,10 +279,6 @@ public class FunctionStatsManager implements AutoCloseable 
{
         return 
statProcessLatency.labels(metricsLabels).get().quantiles.get(0.999);
     }
 
-    public double getTotalProcessed1min() {
-        return statTotalProcessed1min.labels(metricsLabels).get();
-    }
-
     public double getTotalProcessedSuccessfully1min() {
         return statTotalProcessedSuccessfully1min.labels(metricsLabels).get();
     }
@@ -348,7 +317,6 @@ public class FunctionStatsManager implements AutoCloseable {
     }
 
     public void reset() {
-        statTotalProcessed1min.clear();
         statTotalProcessedSuccessfully1min.clear();
         statTotalSysExceptions1min.clear();
         statTotalUserExceptions1min.clear();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 72dbb9f..1d70356 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -255,8 +255,6 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
                 // register end time
                 stats.processTimeEnd();
-                // increment total processed
-                stats.incrTotalProcessed();
 
                 removeLogTopicHandler();
 
@@ -520,7 +518,6 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     private Builder createMetricsDataBuilder() {
         InstanceCommunication.MetricsData.Builder bldr = 
InstanceCommunication.MetricsData.newBuilder();
 
-        bldr.setProcessedTotal((long) stats.getTotalProcessed());
         bldr.setProcessedSuccessfullyTotal((long) 
stats.getTotalProcessedSuccessfully());
         bldr.setSystemExceptionsTotal((long) stats.getTotalSysExceptions());
         bldr.setUserExceptionsTotal((long) stats.getTotalUserExceptions());
@@ -528,7 +525,6 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         bldr.setAvgProcessLatency(stats.getAvgProcessLatency());
         bldr.setLastInvocation((long) stats.getLastInvocation());
 
-        bldr.setProcessedTotal1Min((long) stats.getTotalProcessed1min());
         bldr.setProcessedSuccessfullyTotal1Min((long) 
stats.getTotalProcessedSuccessfully1min());
         bldr.setSystemExceptionsTotal1Min((long) 
stats.getTotalSysExceptions1min());
         bldr.setUserExceptionsTotal1Min((long) 
stats.getTotalUserExceptions1min());
@@ -540,7 +536,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
     public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
         InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = 
InstanceCommunication.FunctionStatus.newBuilder();
-        functionStatusBuilder.setNumProcessed((long) 
stats.getTotalProcessed());
+        
functionStatusBuilder.setNumReceived((long)stats.getTotalRecordsReceived());
         functionStatusBuilder.setNumSuccessfullyProcessed((long) 
stats.getTotalProcessedSuccessfully());
         functionStatusBuilder.setNumUserExceptions((long) 
stats.getTotalUserExceptions());
         stats.getLatestUserExceptions().forEach(ex -> {
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py 
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index d6cc8af..3979d49 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -18,7 +18,6 @@
 #
 
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
 # source: Function.proto
 
 import sys
diff --git 
a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py 
b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index f88e6a3..2f7b376 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='InstanceCommunication.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xb3\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 
\n\x18numSuccessfullyProcessed\x18\x05 
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
+  
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xb2\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 
\x01(\x03\x12\x13\n\x0bnumReceived\x18\x11 \x01(\x03\x12 
\n\x18numSuccessfullyProcessed\x18\x05 
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 
\x03(\x0b\x32*.proto.FunctionStatus.Exc [...]
   ,
   dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
 
@@ -79,8 +79,8 @@ _FUNCTIONSTATUS_EXCEPTIONINFORMATION = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=624,
-  serialized_end=693,
+  serialized_start=623,
+  serialized_end=692,
 )
 
 _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor(
@@ -116,8 +116,8 @@ _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=695,
-  serialized_end=759,
+  serialized_start=694,
+  serialized_end=758,
 )
 
 _FUNCTIONSTATUS = _descriptor.Descriptor(
@@ -149,8 +149,8 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='numProcessed', full_name='proto.FunctionStatus.numProcessed', 
index=3,
-      number=4, type=3, cpp_type=2, label=1,
+      name='numReceived', full_name='proto.FunctionStatus.numReceived', 
index=3,
+      number=17, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
@@ -245,7 +245,7 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=68,
-  serialized_end=759,
+  serialized_end=758,
 )
 
 
@@ -282,8 +282,8 @@ _FUNCTIONSTATUSLIST = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=761,
-  serialized_end=847,
+  serialized_start=760,
+  serialized_end=846,
 )
 
 
@@ -320,8 +320,8 @@ _METRICSDATA_USERMETRICSENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1317,
-  serialized_end=1367,
+  serialized_start=1263,
+  serialized_end=1313,
 )
 
 _METRICSDATA = _descriptor.Descriptor(
@@ -346,84 +346,70 @@ _METRICSDATA = _descriptor.Descriptor(
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='processedTotal', full_name='proto.MetricsData.processedTotal', 
index=2,
-      number=3, type=3, cpp_type=2, label=1,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='processedTotal_1min', 
full_name='proto.MetricsData.processedTotal_1min', index=3,
-      number=11, type=3, cpp_type=2, label=1,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='processedSuccessfullyTotal', 
full_name='proto.MetricsData.processedSuccessfullyTotal', index=4,
+      name='processedSuccessfullyTotal', 
full_name='proto.MetricsData.processedSuccessfullyTotal', index=2,
       number=4, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='processedSuccessfullyTotal_1min', 
full_name='proto.MetricsData.processedSuccessfullyTotal_1min', index=5,
+      name='processedSuccessfullyTotal_1min', 
full_name='proto.MetricsData.processedSuccessfullyTotal_1min', index=3,
       number=12, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='systemExceptionsTotal', 
full_name='proto.MetricsData.systemExceptionsTotal', index=6,
+      name='systemExceptionsTotal', 
full_name='proto.MetricsData.systemExceptionsTotal', index=4,
       number=5, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='systemExceptionsTotal_1min', 
full_name='proto.MetricsData.systemExceptionsTotal_1min', index=7,
+      name='systemExceptionsTotal_1min', 
full_name='proto.MetricsData.systemExceptionsTotal_1min', index=5,
       number=13, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='userExceptionsTotal', 
full_name='proto.MetricsData.userExceptionsTotal', index=8,
+      name='userExceptionsTotal', 
full_name='proto.MetricsData.userExceptionsTotal', index=6,
       number=6, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='userExceptionsTotal_1min', 
full_name='proto.MetricsData.userExceptionsTotal_1min', index=9,
+      name='userExceptionsTotal_1min', 
full_name='proto.MetricsData.userExceptionsTotal_1min', index=7,
       number=14, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='avgProcessLatency', 
full_name='proto.MetricsData.avgProcessLatency', index=10,
+      name='avgProcessLatency', 
full_name='proto.MetricsData.avgProcessLatency', index=8,
       number=7, type=1, cpp_type=5, label=1,
       has_default_value=False, default_value=float(0),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='avgProcessLatency_1min', 
full_name='proto.MetricsData.avgProcessLatency_1min', index=11,
+      name='avgProcessLatency_1min', 
full_name='proto.MetricsData.avgProcessLatency_1min', index=9,
       number=15, type=1, cpp_type=5, label=1,
       has_default_value=False, default_value=float(0),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='lastInvocation', full_name='proto.MetricsData.lastInvocation', 
index=12,
+      name='lastInvocation', full_name='proto.MetricsData.lastInvocation', 
index=10,
       number=8, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='userMetrics', full_name='proto.MetricsData.userMetrics', index=13,
+      name='userMetrics', full_name='proto.MetricsData.userMetrics', index=11,
       number=9, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
@@ -441,8 +427,8 @@ _METRICSDATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=850,
-  serialized_end=1367,
+  serialized_start=849,
+  serialized_end=1313,
 )
 
 
@@ -472,8 +458,8 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1369,
-  serialized_end=1405,
+  serialized_start=1315,
+  serialized_end=1351,
 )
 
 
@@ -517,8 +503,8 @@ _METRICS_INSTANCEMETRICS = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1468,
-  serialized_end=1560,
+  serialized_start=1414,
+  serialized_end=1506,
 )
 
 _METRICS = _descriptor.Descriptor(
@@ -547,8 +533,8 @@ _METRICS = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1408,
-  serialized_end=1560,
+  serialized_start=1354,
+  serialized_end=1506,
 )
 
 _FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
@@ -650,8 +636,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
   file=DESCRIPTOR,
   index=0,
   options=None,
-  serialized_start=1563,
-  serialized_end=1911,
+  serialized_start=1509,
+  serialized_end=1857,
   methods=[
   _descriptor.MethodDescriptor(
     name='GetFunctionStatus',
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py 
b/pulsar-functions/instance/src/main/python/function_stats.py
index f720a41..3d6e216 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -30,7 +30,6 @@ class Stats(object):
   PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_"
   USER_METRIC_PREFIX = "user_metric_";
 
-  TOTAL_PROCESSED = 'processed_total'
   TOTAL_SUCCESSFULLY_PROCESSED = 'processed_successfully_total'
   TOTAL_SYSTEM_EXCEPTIONS = 'system_exceptions_total'
   TOTAL_USER_EXCEPTIONS = 'user_exceptions_total'
@@ -38,7 +37,6 @@ class Stats(object):
   LAST_INVOCATION = 'last_invocation'
   TOTAL_RECEIVED = 'received_total'
 
-  TOTAL_PROCESSED_1min = 'processed_total_1min'
   TOTAL_SUCCESSFULLY_PROCESSED_1min = 'processed_successfully_total_1min'
   TOTAL_SYSTEM_EXCEPTIONS_1min = 'system_exceptions_total_1min'
   TOTAL_USER_EXCEPTIONS_1min = 'user_exceptions_total_1min'
@@ -46,7 +44,6 @@ class Stats(object):
   TOTAL_RECEIVED_1min = 'received_total_1min'
 
   # Declare Prometheus
-  stat_total_processed = Counter(PULSAR_FUNCTION_METRICS_PREFIX + 
TOTAL_PROCESSED, 'Total number of messages processed.', metrics_label_names)
   stat_total_processed_successfully = Counter(PULSAR_FUNCTION_METRICS_PREFIX + 
TOTAL_SUCCESSFULLY_PROCESSED,
                                               'Total number of messages 
processed successfully.', metrics_label_names)
   stat_total_sys_exceptions = Counter(PULSAR_FUNCTION_METRICS_PREFIX+ 
TOTAL_SYSTEM_EXCEPTIONS, 'Total number of system exceptions.',
@@ -62,8 +59,6 @@ class Stats(object):
 
 
   # 1min windowed metrics
-  stat_total_processed_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + 
TOTAL_PROCESSED_1min,
-                                 'Total number of messages processed in the 
last 1 minute.', metrics_label_names)
   stat_total_processed_successfully_1min = 
Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SUCCESSFULLY_PROCESSED_1min,
                                               'Total number of messages 
processed successfully in the last 1 minute.', metrics_label_names)
   stat_total_sys_exceptions_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + 
TOTAL_SYSTEM_EXCEPTIONS_1min,
@@ -92,9 +87,6 @@ class Stats(object):
   def get_total_received(self):
     return self.stat_total_received.labels(*self.metrics_labels)._value.get();
 
-  def get_total_processed(self):
-    return self.stat_total_processed.labels(*self.metrics_labels)._value.get();
-
   def get_total_processed_successfully(self):
     return 
self.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get();
 
@@ -111,20 +103,17 @@ class Stats(object):
       if process_latency_ms_count <= 0.0 \
       else process_latency_ms_sum / process_latency_ms_count
 
-  def get_total_received_1min(self):
-    return 
self.stat_total_received_1min.labels(*self.metrics_labels)._value.get();
-
-  def get_total_processed_1min(self):
-    return 
self.stat_total_processed_1min.labels(*self.metrics_labels)._value.get();
-
   def get_total_processed_successfully_1min(self):
-    return 
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.get();
+    return 
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.get()
 
   def get_total_sys_exceptions_1min(self):
-    return 
self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.get();
+    return 
self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.get()
 
   def get_total_user_exceptions_1min(self):
-    return 
self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.get();
+    return 
self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.get()
+
+  def get_total_received_1min(self):
+    return 
self.stat_total_received_1min.labels(*self.metrics_labels)._value.get()
 
   def get_avg_process_latency_1min(self):
     process_latency_ms_count = 
self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._count.get()
@@ -136,10 +125,6 @@ class Stats(object):
   def get_last_invocation(self):
     return self.stat_last_invocation.labels(*self.metrics_labels)._value.get()
 
-  def incr_total_processed(self):
-    self.stat_total_processed.labels(*self.metrics_labels).inc()
-    self.stat_total_processed_1min.labels(*self.metrics_labels).inc()
-
   def incr_total_processed_successfully(self):
     self.stat_total_processed_successfully.labels(*self.metrics_labels).inc()
     
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels).inc()
@@ -183,7 +168,6 @@ class Stats(object):
   def reset(self):
     self.latest_user_exception = []
     self.latest_sys_exception = []
-    self.stat_total_processed_1min.labels(*self.metrics_labels)._value.set(0.0)
     
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.set(0.0)
     
self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
     
self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index ec85af1..0fc3601 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -211,12 +211,9 @@ class PythonInstance(object):
 
           # stop timer for process time
           self.stats.process_time_end()
-
-          # incr total processed stat
-          self.stats.incr_total_processed()
         except Exception as e:
           Log.exception("Exception while executing user method")
-          self.stats.incr_total_user_exceptions();
+          self.stats.incr_total_user_exceptions()
 
         if self.log_topic_handler is not None:
           log.remove_all_handlers()
@@ -292,7 +289,6 @@ class PythonInstance(object):
   def get_metrics(self):
 
     total_received =  self.stats.get_total_received()
-    total_processed = self.stats.get_total_processed()
     total_processed_successfully = 
self.stats.get_total_processed_successfully()
     total_user_exceptions = self.stats.get_total_user_exceptions()
     total_sys_exceptions = self.stats.get_total_sys_exceptions()
@@ -300,7 +296,6 @@ class PythonInstance(object):
     last_invocation = self.stats.get_last_invocation()
 
     total_received_1min = self.stats.get_total_received_1min()
-    total_processed_1min = self.stats.get_total_processed_1min()
     total_processed_successfully_1min = 
self.stats.get_total_processed_successfully_1min()
     total_user_exceptions_1min = self.stats.get_total_user_exceptions_1min()
     total_sys_exceptions_1min = self.stats.get_total_sys_exceptions_1min()
@@ -309,7 +304,6 @@ class PythonInstance(object):
     metrics_data = InstanceCommunication_pb2.MetricsData()
     # total metrics
     metrics_data.receivedTotal = int(total_received) if sys.version_info.major 
>= 3 else long(total_received)
-    metrics_data.processedTotal = int(total_processed) if 
sys.version_info.major >= 3 else long(total_processed)
     metrics_data.processedSuccessfullyTotal = 
int(total_processed_successfully) if sys.version_info.major >= 3 else 
long(total_processed_successfully)
     metrics_data.systemExceptionsTotal = int(total_sys_exceptions) if 
sys.version_info.major >= 3 else long(total_sys_exceptions)
     metrics_data.userExceptionsTotal = int(total_user_exceptions) if 
sys.version_info.major >= 3 else long(total_user_exceptions)
@@ -317,7 +311,6 @@ class PythonInstance(object):
     metrics_data.lastInvocation = int(last_invocation) if 
sys.version_info.major >= 3 else long(last_invocation)
     # 1min metrics
     metrics_data.receivedTotal_1min = int(total_received_1min) if 
sys.version_info.major >= 3 else long(total_received_1min)
-    metrics_data.processedTotal_1min = int(total_processed_1min) if 
sys.version_info.major >= 3 else long(total_processed_1min)
     metrics_data.processedSuccessfullyTotal_1min = int(
       total_processed_successfully_1min) if sys.version_info.major >= 3 else 
long(total_processed_successfully_1min)
     metrics_data.systemExceptionsTotal_1min = int(total_sys_exceptions_1min) 
if sys.version_info.major >= 3 else long(
@@ -343,14 +336,14 @@ class PythonInstance(object):
     status = InstanceCommunication_pb2.FunctionStatus()
     status.running = True
 
-    total_processed = self.stats.get_total_processed()
+    total_received = self.stats.get_total_received()
     total_processed_successfully = 
self.stats.get_total_processed_successfully()
     total_user_exceptions = self.stats.get_total_user_exceptions()
     total_sys_exceptions = self.stats.get_total_sys_exceptions()
     avg_process_latency_ms = self.stats.get_avg_process_latency()
     last_invocation = self.stats.get_last_invocation()
 
-    status.numProcessed = int(total_processed) if sys.version_info.major >= 3 
else long(total_processed)
+    status.numReceived = int(total_received) if sys.version_info.major >= 3 
else long(total_received)
     status.numSuccessfullyProcessed = int(total_processed_successfully) if 
sys.version_info.major >= 3 else long(total_processed_successfully)
     status.numUserExceptions = int(total_user_exceptions) if 
sys.version_info.major >= 3 else long(total_user_exceptions)
     status.instanceId = self.instance_config.instance_id
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto 
b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 05d1399..e5359f8 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -32,7 +32,8 @@ message FunctionStatus {
     bool running = 1;
     string failureException = 2;
     int64 numRestarts = 3;
-    int64 numProcessed = 4;
+    // int64 numProcessed = 4;
+    int64 numReceived = 17;
     int64 numSuccessfullyProcessed = 5;
     int64 numUserExceptions = 6;
     repeated ExceptionInformation latestUserExceptions = 7;
@@ -74,9 +75,10 @@ message MetricsData {
     int64 receivedTotal_1min = 10;
 
     // Total number of records processed
-    int64 processedTotal = 3;
+    // No longer used because processedSuccessfullyTotal and 
userExceptionsTotal add to it
+    // int64 processedTotal = 3;
 
-    int64 processedTotal_1min = 11;
+    // int64 processedTotal_1min = 11;
 
     // Total number of records successfully processed by user function
     int64 processedSuccessfullyTotal = 4;
@@ -124,4 +126,4 @@ message Metrics {
                MetricsData metricsData = 3;
        }
        repeated InstanceMetrics metrics = 1;
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 77228ac..2b8e0fd 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -71,7 +71,6 @@ public class FunctionsStatsGenerator {
                             metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.PROCESS_LATENCY_MS, instanceId, 
metrics.getAvgProcessLatency());
                             metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.LAST_INVOCATION, instanceId, metrics.getLastInvocation());
                             metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.PROCESSED_SUCCESSFULLY_TOTAL, instanceId, 
metrics.getProcessedSuccessfullyTotal());
-                            metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.PROCESSED_TOTAL, instanceId, metrics.getProcessedTotal());
                             metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.RECEIVED_TOTAL, instanceId, metrics.getReceivedTotal());
                             metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.SYSTEM_EXCEPTIONS_TOTAL, instanceId, 
metrics.getSystemExceptionsTotal());
                             metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.USER_EXCEPTIONS_TOTAL, instanceId, 
metrics.getUserExceptionsTotal());
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index efc2974..4b54bcf 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -88,7 +88,6 @@ public class FunctionStatsGeneratorTest {
         CompletableFuture<InstanceCommunication.MetricsData> 
metricsDataCompletableFuture = new CompletableFuture<>();
         InstanceCommunication.MetricsData metricsData = 
InstanceCommunication.MetricsData.newBuilder()
                 .setReceivedTotal(101)
-                .setProcessedTotal(100)
                 .setProcessedSuccessfullyTotal(99)
                 .setAvgProcessLatency(10.0)
                 .setUserExceptionsTotal(3)
@@ -126,7 +125,7 @@ public class FunctionStatsGeneratorTest {
         buf.release();
         Map<String, Metric> metrics = parseMetrics(str);
 
-        Assert.assertEquals(metrics.size(), 7);
+        Assert.assertEquals(metrics.size(), 6);
 
         System.out.println("metrics: " + metrics);
         Metric m = metrics.get("pulsar_function_received_total");
@@ -136,13 +135,6 @@ public class FunctionStatsGeneratorTest {
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 101.0);
 
-        m = metrics.get("pulsar_function_processed_total");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 100.0);
-
         m = metrics.get("pulsar_function_user_exceptions_total");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 0bfeb90..e72963e 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -333,7 +333,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
             try {
                 ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get sink status : {}", result.getStdout());
-                if (result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\"")) {
+                if (result.getStdout().contains("\"numSuccessfullyProcessed\": 
\"" + numMessages + "\"")) {
                     return;
                 }
             } catch (ContainerExecException e) {
@@ -532,7 +532,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
             try {
                 ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get source status : {}", result.getStdout());
-                if (result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\"")) {
+                if (result.getStdout().contains("\"numSuccessfullyProcessed\": 
\"" + numMessages + "\"")) {
                     return;
                 }
             } catch (ContainerExecException e) {
@@ -561,7 +561,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
             try {
                 ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get sink status : {}", result.getStdout());
-                if (result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\"")) {
+                if (result.getStdout().contains("\"numSuccessfullyProcessed\": 
\"" + numMessages + "\"")) {
                     return;
                 }
             } catch (ContainerExecException e) {
@@ -913,7 +913,6 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
             "--name", functionName
         );
         assertTrue(result.getStdout().contains("\"running\": true"));
-        assertTrue(result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\""));
         assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\": 
\"" + numMessages + "\""));
     }
 

Reply via email to