This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch 
1274-messaging-protocol-is-not-overridden-when-importing-data
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/1274-messaging-protocol-is-not-overridden-when-importing-data by 
this push:
     new 467cbc032 Properly override messaging protocol when importing 
resources (#1274)
467cbc032 is described below

commit 467cbc032efe6db9361f7273662c98d15a0e0f1e
Author: Dominik Riemer <[email protected]>
AuthorDate: Sun Feb 12 13:28:30 2023 +0100

    Properly override messaging protocol when importing resources (#1274)
---
 .../export/resolver/AbstractResolver.java          |  7 +++
 .../export/resolver/AdapterResolver.java           |  3 +-
 .../export/resolver/DataSourceResolver.java        |  3 +-
 .../export/resolver/PipelineResolver.java          |  9 ++--
 .../export/utils/EventGroundingProcessor.java      | 60 +++++++++++++++++++---
 .../provider/StoredPipelineElementProvider.java    | 13 ++++-
 6 files changed, 77 insertions(+), 18 deletions(-)

diff --git 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AbstractResolver.java
 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AbstractResolver.java
index 5721dd5ce..d86acdba2 100644
--- 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AbstractResolver.java
+++ 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AbstractResolver.java
@@ -19,9 +19,11 @@
 package org.apache.streampipes.export.resolver;
 
 import org.apache.streampipes.commons.exceptions.ElementNotFoundException;
+import org.apache.streampipes.export.utils.EventGroundingProcessor;
 import org.apache.streampipes.export.utils.SerializationUtils;
 import org.apache.streampipes.model.assets.AssetLink;
 import org.apache.streampipes.model.export.ExportItem;
+import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.storage.api.INoSqlStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
@@ -80,4 +82,9 @@ public abstract class AbstractResolver<T> {
   public abstract void writeDocument(String document) throws 
JsonProcessingException, DocumentConflictException;
 
   protected abstract T deserializeDocument(String document) throws 
JsonProcessingException;
+
+  protected void overrideProtocol(EventGrounding grounding) {
+    var newProtocol = new 
EventGroundingProcessor().applyOverride(grounding.getTransportProtocol());
+    grounding.setTransportProtocol(newProtocol);
+  }
 }
diff --git 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
index 9c3c49ebe..c3227ae1e 100644
--- 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
+++ 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
@@ -19,7 +19,6 @@
 
 package org.apache.streampipes.export.resolver;
 
-import org.apache.streampipes.export.utils.EventGroundingProcessor;
 import org.apache.streampipes.export.utils.SerializationUtils;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
@@ -64,7 +63,7 @@ public class AdapterResolver extends 
AbstractResolver<AdapterDescription> {
                             boolean overrideDocument) throws 
JsonProcessingException {
     var adapterDescription = deserializeDocument(document);
     if (overrideDocument) {
-      
EventGroundingProcessor.applyOverride(adapterDescription.getEventGrounding().getTransportProtocol());
+      overrideProtocol(adapterDescription.getEventGrounding());
     }
     
getNoSqlStore().getAdapterInstanceStorage().storeAdapter(adapterDescription);
   }
diff --git 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
index 1df37785e..499c3503b 100644
--- 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
+++ 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.export.resolver;
 
-import org.apache.streampipes.export.utils.EventGroundingProcessor;
 import org.apache.streampipes.export.utils.SerializationUtils;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.export.ExportItem;
@@ -58,7 +57,7 @@ public class DataSourceResolver extends 
AbstractResolver<SpDataStream> {
     var dataStream = deserializeDocument(document);
     if (overrideDocument) {
       if (dataStream.getEventGrounding() != null) {
-        
EventGroundingProcessor.applyOverride(dataStream.getEventGrounding().getTransportProtocol());
+        overrideProtocol(dataStream.getEventGrounding());
       }
     }
     getNoSqlStore().getDataStreamStorage().createElement(dataStream);
diff --git 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
index ac82c7a24..9ba922136 100644
--- 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
+++ 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.export.resolver;
 
-import org.apache.streampipes.export.utils.EventGroundingProcessor;
 import org.apache.streampipes.export.utils.SerializationUtils;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.export.ExportItem;
@@ -70,17 +69,17 @@ public class PipelineResolver extends 
AbstractResolver<Pipeline> {
     if (overrideDocument) {
       pipeline.setSepas(pipeline.getSepas().stream().peek(processor -> {
         processor.getInputStreams()
-            .forEach(is -> 
EventGroundingProcessor.applyOverride(is.getEventGrounding().getTransportProtocol()));
-        
EventGroundingProcessor.applyOverride(processor.getOutputStream().getEventGrounding().getTransportProtocol());
+            .forEach(is -> overrideProtocol(is.getEventGrounding()));
+        overrideProtocol(processor.getOutputStream().getEventGrounding());
       }).collect(Collectors.toList()));
 
       pipeline.setStreams(pipeline.getStreams().stream().peek(stream -> {
-        
EventGroundingProcessor.applyOverride(stream.getEventGrounding().getTransportProtocol());
+        overrideProtocol(stream.getEventGrounding());
       }).collect(Collectors.toList()));
 
       pipeline.setActions(pipeline.getActions().stream().peek(sink -> {
         sink.getInputStreams()
-            .forEach(is -> 
EventGroundingProcessor.applyOverride(is.getEventGrounding().getTransportProtocol()));
+            .forEach(is -> overrideProtocol(is.getEventGrounding()));
       }).collect(Collectors.toList()));
 
     }
diff --git 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java
 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java
index 15578de8e..b946e00dd 100644
--- 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java
+++ 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java
@@ -19,19 +19,65 @@
 package org.apache.streampipes.export.utils;
 
 import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.model.config.SpProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.NatsTransportProtocol;
 import org.apache.streampipes.model.grounding.TransportProtocol;
 
 public class EventGroundingProcessor {
 
-  public static void applyOverride(TransportProtocol protocol) {
-    if (protocol instanceof KafkaTransportProtocol) {
-      protocol.setBrokerHostname(BackendConfig.INSTANCE.getKafkaHost());
-      ((KafkaTransportProtocol) 
protocol).setKafkaPort(BackendConfig.INSTANCE.getKafkaPort());
-    } else if (protocol instanceof MqttTransportProtocol) {
-      protocol.setBrokerHostname(BackendConfig.INSTANCE.getMqttHost());
-      ((MqttTransportProtocol) 
protocol).setPort(BackendConfig.INSTANCE.getMqttPort());
+  SpProtocol configuredProtocol;
+  public EventGroundingProcessor() {
+    this.configuredProtocol = 
BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
+  }
+
+  public TransportProtocol applyOverride(TransportProtocol protocol) {
+    var protocolOverride = getConfiguredTransportProtocol();
+    protocolOverride.setTopicDefinition(protocol.getTopicDefinition());
+    return protocol;
+  }
+
+  private TransportProtocol getConfiguredTransportProtocol() {
+    return initializeProtocol(configuredProtocol);
+  }
+
+  private TransportProtocol initializeProtocol(SpProtocol configuredProtocol) {
+    if (isProtocol(configuredProtocol, KafkaTransportProtocol.class)) {
+      return makeKafkaProtocol();
+    } else if (isProtocol(configuredProtocol, MqttTransportProtocol.class)) {
+      return makeMqttProtocol();
+    } else {
+      return makeNatsProtocol();
     }
   }
+
+  private boolean isProtocol(SpProtocol configuredProtocol,
+                             Class<?> actualProtocolClass) {
+    return 
configuredProtocol.getProtocolClass().equals(actualProtocolClass.getCanonicalName());
+  }
+
+
+  private KafkaTransportProtocol makeKafkaProtocol() {
+    var protocol = new KafkaTransportProtocol();
+    protocol.setBrokerHostname(BackendConfig.INSTANCE.getKafkaHost());
+    protocol.setKafkaPort(BackendConfig.INSTANCE.getKafkaPort());
+    return protocol;
+  }
+
+  private MqttTransportProtocol makeMqttProtocol() {
+    var protocol = new MqttTransportProtocol();
+    protocol.setBrokerHostname(BackendConfig.INSTANCE.getMqttHost());
+    protocol.setPort(BackendConfig.INSTANCE.getMqttPort());
+    return protocol;
+  }
+
+  private NatsTransportProtocol makeNatsProtocol() {
+    var protocol = new NatsTransportProtocol();
+    protocol.setBrokerHostname(BackendConfig.INSTANCE.getNatsHost());
+    protocol.setPort(BackendConfig.INSTANCE.getNatsPort());
+    return protocol;
+  }
+
+
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
index ad5860711..9b616d2a6 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
@@ -23,6 +23,7 @@ import 
org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -32,11 +33,19 @@ import java.util.List;
 public class StoredPipelineElementProvider implements PipelineElementProvider {
   @Override
   public List<InvocableStreamPipesEntity> 
getProcessorsAndSinks(PipelineExecutionInfo executionInfo) {
-    return 
RunningPipelineElementStorage.runningProcessorsAndSinks.get(executionInfo.getPipelineId());
+    if 
(RunningPipelineElementStorage.runningProcessorsAndSinks.containsKey(executionInfo.getPipelineId()))
 {
+      return 
RunningPipelineElementStorage.runningProcessorsAndSinks.get(executionInfo.getPipelineId());
+    } else {
+      return new ArrayList<>();
+    }
   }
 
   @Override
   public List<SpDataSet> getDataSets(PipelineExecutionInfo executionInfo) {
-    return 
RunningPipelineElementStorage.runningDataSets.get(executionInfo.getPipelineId());
+    if 
(RunningPipelineElementStorage.runningDataSets.containsKey(executionInfo.getPipelineId()))
 {
+      return 
RunningPipelineElementStorage.runningDataSets.get(executionInfo.getPipelineId());
+    } else {
+      return new ArrayList<>();
+    }
   }
 }

Reply via email to