This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new ff21060192 fix: Properly update output streams in functions (#3238)
ff21060192 is described below
commit ff210601927a62d6346c0c1553acb001f274f62f
Author: Dominik Riemer <[email protected]>
AuthorDate: Wed Sep 25 17:09:21 2024 +0200
fix: Properly update output streams in functions (#3238)
---
.../function/StreamPipesFunctionHandler.java | 45 ++++++++++++----------
1 file changed, 25 insertions(+), 20 deletions(-)
diff --git
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
index aeda204da9..7b02dc4db4 100644
---
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
+++
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
@@ -62,27 +62,32 @@ public enum StreamPipesFunctionHandler {
var client = new
StreamPipesClientResolver().makeStreamPipesClientInstance();
functions.forEach(function -> {
function.getFunctionConfig().getOutputDataStreams().values().forEach(ds
-> {
- DeclarersSingleton.getInstance().add(new IStreamPipesDataStream() {
- @Override
- public IDataStreamConfiguration declareConfig() {
- return DataStreamConfiguration.create(
- () -> this,
- ds
- );
+ if
(!DeclarersSingleton.getInstance().getDataStreams().containsKey(ds.getAppId()))
{
+ DeclarersSingleton.getInstance().add(new IStreamPipesDataStream() {
+ @Override
+ public IDataStreamConfiguration declareConfig() {
+ return DataStreamConfiguration.create(
+ () -> this,
+ ds
+ );
+ }
+
+ @Override
+ public void executeStream() {
+
+ }
+
+ @Override
+ public boolean isExecutable() {
+ return false;
+ }
+ });
+ if (client.streams().get(ds.getElementId()).isEmpty()) {
+ client.streams().create(ds);
+ } else {
+ client.streams().update(ds);
}
-
- @Override
- public void executeStream() {
-
- }
-
- @Override
- public boolean isExecutable() {
- return false;
- }
- });
-
- client.streams().create(ds);
+ }
});
});
}