This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new ff21060192 fix: Properly update output streams in functions (#3238)
ff21060192 is described below

commit ff210601927a62d6346c0c1553acb001f274f62f
Author: Dominik Riemer <[email protected]>
AuthorDate: Wed Sep 25 17:09:21 2024 +0200

    fix: Properly update output streams in functions (#3238)
---
 .../function/StreamPipesFunctionHandler.java       | 45 ++++++++++++----------
 1 file changed, 25 insertions(+), 20 deletions(-)

diff --git 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
index aeda204da9..7b02dc4db4 100644
--- 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
+++ 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
@@ -62,27 +62,32 @@ public enum StreamPipesFunctionHandler {
     var client = new 
StreamPipesClientResolver().makeStreamPipesClientInstance();
     functions.forEach(function -> {
       function.getFunctionConfig().getOutputDataStreams().values().forEach(ds 
-> {
-        DeclarersSingleton.getInstance().add(new IStreamPipesDataStream() {
-          @Override
-          public IDataStreamConfiguration declareConfig() {
-            return DataStreamConfiguration.create(
-                () -> this,
-                ds
-            );
+        if 
(!DeclarersSingleton.getInstance().getDataStreams().containsKey(ds.getAppId())) 
{
+          DeclarersSingleton.getInstance().add(new IStreamPipesDataStream() {
+            @Override
+            public IDataStreamConfiguration declareConfig() {
+              return DataStreamConfiguration.create(
+                  () -> this,
+                  ds
+              );
+            }
+
+            @Override
+            public void executeStream() {
+
+            }
+
+            @Override
+            public boolean isExecutable() {
+              return false;
+            }
+          });
+          if (client.streams().get(ds.getElementId()).isEmpty()) {
+            client.streams().create(ds);
+          } else {
+            client.streams().update(ds);
           }
-
-          @Override
-          public void executeStream() {
-
-          }
-
-          @Override
-          public boolean isExecutable() {
-            return false;
-          }
-        });
-
-        client.streams().create(ds);
+        }
       });
     });
   }

Reply via email to