This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch fix-function-stream-update in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 3adaec78e800941637411ed001f766ade61dbc66 Author: Dominik Riemer <[email protected]> AuthorDate: Tue Sep 17 21:56:16 2024 +0200 fix: Properly update output streams in functions --- .../function/StreamPipesFunctionHandler.java | 45 ++++++++++++---------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java index aeda204da9..7b02dc4db4 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java @@ -62,27 +62,32 @@ public enum StreamPipesFunctionHandler { var client = new StreamPipesClientResolver().makeStreamPipesClientInstance(); functions.forEach(function -> { function.getFunctionConfig().getOutputDataStreams().values().forEach(ds -> { - DeclarersSingleton.getInstance().add(new IStreamPipesDataStream() { - @Override - public IDataStreamConfiguration declareConfig() { - return DataStreamConfiguration.create( - () -> this, - ds - ); + if (!DeclarersSingleton.getInstance().getDataStreams().containsKey(ds.getAppId())) { + DeclarersSingleton.getInstance().add(new IStreamPipesDataStream() { + @Override + public IDataStreamConfiguration declareConfig() { + return DataStreamConfiguration.create( + () -> this, + ds + ); + } + + @Override + public void executeStream() { + + } + + @Override + public boolean isExecutable() { + return false; + } + }); + if (client.streams().get(ds.getElementId()).isEmpty()) { + client.streams().create(ds); + } else { + client.streams().update(ds); } - - @Override - public void executeStream() { - - } - - @Override - public boolean isExecutable() { - return false; - } - }); - - client.streams().create(ds); + } }); }); }
