This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch
1274-messaging-protocol-is-not-overridden-when-importing-data
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/1274-messaging-protocol-is-not-overridden-when-importing-data by
this push:
new 467cbc032 Properly override messaging protocol when importing
resources (#1274)
467cbc032 is described below
commit 467cbc032efe6db9361f7273662c98d15a0e0f1e
Author: Dominik Riemer <[email protected]>
AuthorDate: Sun Feb 12 13:28:30 2023 +0100
Properly override messaging protocol when importing resources (#1274)
---
.../export/resolver/AbstractResolver.java | 7 +++
.../export/resolver/AdapterResolver.java | 3 +-
.../export/resolver/DataSourceResolver.java | 3 +-
.../export/resolver/PipelineResolver.java | 9 ++--
.../export/utils/EventGroundingProcessor.java | 60 +++++++++++++++++++---
.../provider/StoredPipelineElementProvider.java | 13 ++++-
6 files changed, 77 insertions(+), 18 deletions(-)
diff --git
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AbstractResolver.java
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AbstractResolver.java
index 5721dd5ce..d86acdba2 100644
---
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AbstractResolver.java
+++
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AbstractResolver.java
@@ -19,9 +19,11 @@
package org.apache.streampipes.export.resolver;
import org.apache.streampipes.commons.exceptions.ElementNotFoundException;
+import org.apache.streampipes.export.utils.EventGroundingProcessor;
import org.apache.streampipes.export.utils.SerializationUtils;
import org.apache.streampipes.model.assets.AssetLink;
import org.apache.streampipes.model.export.ExportItem;
+import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.storage.api.INoSqlStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
@@ -80,4 +82,9 @@ public abstract class AbstractResolver<T> {
public abstract void writeDocument(String document) throws
JsonProcessingException, DocumentConflictException;
protected abstract T deserializeDocument(String document) throws
JsonProcessingException;
+
+ protected void overrideProtocol(EventGrounding grounding) {
+ var newProtocol = new
EventGroundingProcessor().applyOverride(grounding.getTransportProtocol());
+ grounding.setTransportProtocol(newProtocol);
+ }
}
diff --git
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
index 9c3c49ebe..c3227ae1e 100644
---
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
+++
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.export.resolver;
-import org.apache.streampipes.export.utils.EventGroundingProcessor;
import org.apache.streampipes.export.utils.SerializationUtils;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
@@ -64,7 +63,7 @@ public class AdapterResolver extends
AbstractResolver<AdapterDescription> {
boolean overrideDocument) throws
JsonProcessingException {
var adapterDescription = deserializeDocument(document);
if (overrideDocument) {
-
EventGroundingProcessor.applyOverride(adapterDescription.getEventGrounding().getTransportProtocol());
+ overrideProtocol(adapterDescription.getEventGrounding());
}
getNoSqlStore().getAdapterInstanceStorage().storeAdapter(adapterDescription);
}
diff --git
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
index 1df37785e..499c3503b 100644
---
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
+++
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.export.resolver;
-import org.apache.streampipes.export.utils.EventGroundingProcessor;
import org.apache.streampipes.export.utils.SerializationUtils;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.export.ExportItem;
@@ -58,7 +57,7 @@ public class DataSourceResolver extends
AbstractResolver<SpDataStream> {
var dataStream = deserializeDocument(document);
if (overrideDocument) {
if (dataStream.getEventGrounding() != null) {
-
EventGroundingProcessor.applyOverride(dataStream.getEventGrounding().getTransportProtocol());
+ overrideProtocol(dataStream.getEventGrounding());
}
}
getNoSqlStore().getDataStreamStorage().createElement(dataStream);
diff --git
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
index ac82c7a24..9ba922136 100644
---
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
+++
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.export.resolver;
-import org.apache.streampipes.export.utils.EventGroundingProcessor;
import org.apache.streampipes.export.utils.SerializationUtils;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.export.ExportItem;
@@ -70,17 +69,17 @@ public class PipelineResolver extends
AbstractResolver<Pipeline> {
if (overrideDocument) {
pipeline.setSepas(pipeline.getSepas().stream().peek(processor -> {
processor.getInputStreams()
- .forEach(is ->
EventGroundingProcessor.applyOverride(is.getEventGrounding().getTransportProtocol()));
-
EventGroundingProcessor.applyOverride(processor.getOutputStream().getEventGrounding().getTransportProtocol());
+ .forEach(is -> overrideProtocol(is.getEventGrounding()));
+ overrideProtocol(processor.getOutputStream().getEventGrounding());
}).collect(Collectors.toList()));
pipeline.setStreams(pipeline.getStreams().stream().peek(stream -> {
-
EventGroundingProcessor.applyOverride(stream.getEventGrounding().getTransportProtocol());
+ overrideProtocol(stream.getEventGrounding());
}).collect(Collectors.toList()));
pipeline.setActions(pipeline.getActions().stream().peek(sink -> {
sink.getInputStreams()
- .forEach(is ->
EventGroundingProcessor.applyOverride(is.getEventGrounding().getTransportProtocol()));
+ .forEach(is -> overrideProtocol(is.getEventGrounding()));
}).collect(Collectors.toList()));
}
diff --git
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java
index 15578de8e..b946e00dd 100644
---
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java
+++
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java
@@ -19,19 +19,65 @@
package org.apache.streampipes.export.utils;
import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.model.config.SpProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.NatsTransportProtocol;
import org.apache.streampipes.model.grounding.TransportProtocol;
public class EventGroundingProcessor {
- public static void applyOverride(TransportProtocol protocol) {
- if (protocol instanceof KafkaTransportProtocol) {
- protocol.setBrokerHostname(BackendConfig.INSTANCE.getKafkaHost());
- ((KafkaTransportProtocol)
protocol).setKafkaPort(BackendConfig.INSTANCE.getKafkaPort());
- } else if (protocol instanceof MqttTransportProtocol) {
- protocol.setBrokerHostname(BackendConfig.INSTANCE.getMqttHost());
- ((MqttTransportProtocol)
protocol).setPort(BackendConfig.INSTANCE.getMqttPort());
+ SpProtocol configuredProtocol;
+ public EventGroundingProcessor() {
+ this.configuredProtocol =
BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
+ }
+
+ public TransportProtocol applyOverride(TransportProtocol protocol) {
+ var protocolOverride = getConfiguredTransportProtocol();
+ protocolOverride.setTopicDefinition(protocol.getTopicDefinition());
+ return protocol;
+ }
+
+ private TransportProtocol getConfiguredTransportProtocol() {
+ return initializeProtocol(configuredProtocol);
+ }
+
+ private TransportProtocol initializeProtocol(SpProtocol configuredProtocol) {
+ if (isProtocol(configuredProtocol, KafkaTransportProtocol.class)) {
+ return makeKafkaProtocol();
+ } else if (isProtocol(configuredProtocol, MqttTransportProtocol.class)) {
+ return makeMqttProtocol();
+ } else {
+ return makeNatsProtocol();
}
}
+
+ private boolean isProtocol(SpProtocol configuredProtocol,
+ Class<?> actualProtocolClass) {
+ return
configuredProtocol.getProtocolClass().equals(actualProtocolClass.getCanonicalName());
+ }
+
+
+ private KafkaTransportProtocol makeKafkaProtocol() {
+ var protocol = new KafkaTransportProtocol();
+ protocol.setBrokerHostname(BackendConfig.INSTANCE.getKafkaHost());
+ protocol.setKafkaPort(BackendConfig.INSTANCE.getKafkaPort());
+ return protocol;
+ }
+
+ private MqttTransportProtocol makeMqttProtocol() {
+ var protocol = new MqttTransportProtocol();
+ protocol.setBrokerHostname(BackendConfig.INSTANCE.getMqttHost());
+ protocol.setPort(BackendConfig.INSTANCE.getMqttPort());
+ return protocol;
+ }
+
+ private NatsTransportProtocol makeNatsProtocol() {
+ var protocol = new NatsTransportProtocol();
+ protocol.setBrokerHostname(BackendConfig.INSTANCE.getNatsHost());
+ protocol.setPort(BackendConfig.INSTANCE.getNatsPort());
+ return protocol;
+ }
+
+
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
index ad5860711..9b616d2a6 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
@@ -23,6 +23,7 @@ import
org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -32,11 +33,19 @@ import java.util.List;
public class StoredPipelineElementProvider implements PipelineElementProvider {
@Override
public List<InvocableStreamPipesEntity>
getProcessorsAndSinks(PipelineExecutionInfo executionInfo) {
- return
RunningPipelineElementStorage.runningProcessorsAndSinks.get(executionInfo.getPipelineId());
+ if
(RunningPipelineElementStorage.runningProcessorsAndSinks.containsKey(executionInfo.getPipelineId()))
{
+ return
RunningPipelineElementStorage.runningProcessorsAndSinks.get(executionInfo.getPipelineId());
+ } else {
+ return new ArrayList<>();
+ }
}
@Override
public List<SpDataSet> getDataSets(PipelineExecutionInfo executionInfo) {
- return
RunningPipelineElementStorage.runningDataSets.get(executionInfo.getPipelineId());
+ if
(RunningPipelineElementStorage.runningDataSets.containsKey(executionInfo.getPipelineId()))
{
+ return
RunningPipelineElementStorage.runningDataSets.get(executionInfo.getPipelineId());
+ } else {
+ return new ArrayList<>();
+ }
}
}