This is an automated email from the ASF dual-hosted git repository.

oehler pushed a commit to branch improve-streampipes-function-monitoring
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 7c19394d7c2f624acab967758015770597594dbf
Author: Sven Oehler <[email protected]>
AuthorDate: Fri Mar 20 13:29:37 2026 +0100

    Improve StreamPipesFunction monitoring
---
 .../standalone/function/StreamPipesFunction.java   | 22 +++++++++++++++++-----
 1 file changed, 17 insertions(+), 5 deletions(-)

diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
index fadf1fdff7..5878d25087 100644
--- 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
@@ -104,9 +104,13 @@ public abstract class StreamPipesFunction implements 
IStreamPipesFunctionDeclare
   public void discardRuntime() {
     var functionId = this.getFunctionConfig().getFunctionId();
     LOG.info("Discarding function {}:{}", functionId.getId(), 
functionId.getVersion());
-    onServiceStopped();
-    unregisterConsumers();
     this.outputCollectors.forEach((key, value) -> value.disconnect());
+    unregisterConsumers();
+    try {
+      onServiceStopped();
+    } catch (Exception e) {
+      throw new SpRuntimeException("Custom stop behaviour failed with: " + e);
+    }
   }
 
   @Override
@@ -116,7 +120,7 @@ public abstract class StreamPipesFunction implements 
IStreamPipesFunctionDeclare
       var event = EventFactory
           .fromMap(rawEvent, sourceInfo, schemaInfoMapper.get(topicName));
       this.onEvent(event, sourceInfo.getSourceId());
-      increaseCounter(sourceInfo.getSourceId(), size);
+      increaseInCounter(sourceInfo.getSourceId(), size);
     } catch (RuntimeException e) {
       addError(e);
     }
@@ -126,7 +130,7 @@ public abstract class StreamPipesFunction implements 
IStreamPipesFunctionDeclare
     return 
stream.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
   }
 
-  private void increaseCounter(String sourceInfo, long size) {
+  protected void increaseInCounter(String sourceInfo, long size) {
     var functionId = this.getFunctionConfig().getFunctionId();
     SpMonitoringManager.INSTANCE.increaseInCounter(
         functionId.getId(),
@@ -136,7 +140,15 @@ public abstract class StreamPipesFunction implements 
IStreamPipesFunctionDeclare
     );
   }
 
-  private void addError(RuntimeException e) {
+  protected void increaseOutCounter(long size) {
+    var functionId = this.getFunctionConfig().getFunctionId();
+    SpMonitoringManager.INSTANCE.increaseOutCounter(
+            functionId.getId(),
+            size,
+            System.currentTimeMillis());
+  }
+
+  protected void addError(RuntimeException e) {
     var functionId = this.getFunctionConfig().getFunctionId();
     SpMonitoringManager.INSTANCE.addErrorMessage(
         functionId.getId(),

Reply via email to