This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch 1614-migrate-data-sinks-in-brokers-jvm-module
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/1614-migrate-data-sinks-in-brokers-jvm-module by this push:
new 42617e3ba Migrate data sinks in brokers-jvm module (#1614)
42617e3ba is described below
commit 42617e3ba29a25e288486d5ca3d6adeb60e9d735
Author: Dominik Riemer <[email protected]>
AuthorDate: Fri May 26 17:23:09 2023 +0200
Migrate data sinks in brokers-jvm module (#1614)
---
.../sinks/brokers/jvm/BrokersJvmInit.java | 16 +--
.../sinks/brokers/jvm/bufferrest/BufferRest.java | 84 ---------------
.../jvm/bufferrest/BufferRestParameters.java | 51 ---------
...ontroller.java => BufferRestPublisherSink.java} | 68 ++++++++++--
.../sinks/brokers/jvm/jms/JmsParameters.java | 48 ---------
.../sinks/brokers/jvm/jms/JmsPublisher.java | 70 -------------
.../{JmsController.java => JmsPublisherSink.java} | 61 +++++++----
.../sinks/brokers/jvm/nats/NatsController.java | 76 +++++++++++---
.../sinks/brokers/jvm/nats/NatsParameters.java | 38 -------
.../sinks/brokers/jvm/nats/NatsPublisher.java | 83 ---------------
.../brokers/jvm/rabbitmq/RabbitMqConsumer.java | 69 -------------
.../brokers/jvm/rabbitmq/RabbitMqController.java | 110 --------------------
.../brokers/jvm/rabbitmq/RabbitMqParameters.java | 8 +-
.../jvm/rabbitmq/RabbitMqPublisherSink.java | 115 +++++++++++++++++++++
.../sinks/brokers/jvm/rest/RestParameters.java | 36 -------
.../sinks/brokers/jvm/rest/RestPublisher.java | 71 -------------
.../rest/{RestController.java => RestSink.java} | 56 ++++++++--
17 files changed, 329 insertions(+), 731 deletions(-)
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
index 1e07fc515..f3920b0e6 100644
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
@@ -28,14 +28,14 @@ import
org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
-import
org.apache.streampipes.sinks.brokers.jvm.bufferrest.BufferRestController;
-import org.apache.streampipes.sinks.brokers.jvm.jms.JmsController;
+import
org.apache.streampipes.sinks.brokers.jvm.bufferrest.BufferRestPublisherSink;
+import org.apache.streampipes.sinks.brokers.jvm.jms.JmsPublisherSink;
import org.apache.streampipes.sinks.brokers.jvm.kafka.KafkaPublishSink;
import org.apache.streampipes.sinks.brokers.jvm.mqtt.MqttPublisherSink;
import org.apache.streampipes.sinks.brokers.jvm.nats.NatsController;
import org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink;
-import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqController;
-import org.apache.streampipes.sinks.brokers.jvm.rest.RestController;
+import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqPublisherSink;
+import org.apache.streampipes.sinks.brokers.jvm.rest.RestSink;
import org.apache.streampipes.sinks.brokers.jvm.rocketmq.RocketMQPublisherSink;
import org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink;
import org.apache.streampipes.sinks.brokers.jvm.websocket.WebsocketServerSink;
@@ -54,10 +54,10 @@ public class BrokersJvmInit extends
ExtensionsModelSubmitter {
8096)
.registerPipelineElements(
new KafkaPublishSink(),
- new JmsController(),
- new RestController(),
- new BufferRestController(),
- new RabbitMqController(),
+ new JmsPublisherSink(),
+ new RestSink(),
+ new BufferRestPublisherSink(),
+ new RabbitMqPublisherSink(),
new MqttPublisherSink(),
new WebsocketServerSink(),
new PulsarPublisherSink(),
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRest.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRest.java
deleted file mode 100644
index 20ff4490f..000000000
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.bufferrest;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.SpDataFormatDefinition;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.model.runtime.Event;
-import
org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.BufferListener;
-import
org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.MessageBuffer;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import org.apache.commons.io.Charsets;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.StringEntity;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-
-public class BufferRest implements EventSink<BufferRestParameters>,
BufferListener {
-
- private static Logger log;
- private List<String> fieldsToSend;
- private SpDataFormatDefinition dataFormatDefinition;
- private String restEndpointURI;
- private MessageBuffer buffer;
-
- public BufferRest() {
- this.dataFormatDefinition = new JsonDataFormatDefinition();
- }
-
- @Override
- public void onInvocation(BufferRestParameters parameters,
EventSinkRuntimeContext runtimeContext) {
- this.fieldsToSend = parameters.getFieldsToSend();
- this.restEndpointURI = parameters.getRestEndpointURI();
- this.buffer = new MessageBuffer(parameters.getBufferSize());
- this.buffer.addListener(this);
- }
-
- @Override
- public void onEvent(Event event) {
- Map<String, Object> outEventMap = event.getSubset(fieldsToSend).getRaw();
- try {
- String json = new String(dataFormatDefinition.fromMap(outEventMap));
- this.buffer.addMessage(json);
- } catch (SpRuntimeException e) {
- log.error("Could not parse incoming event");
- }
- }
-
- @Override
- public void onDetach() {
- buffer.removeListener(this);
- }
-
- @Override
- public void bufferFull(String messagesJsonArray) {
- try {
- Request.Post(restEndpointURI).body(new StringEntity(messagesJsonArray,
Charsets.UTF_8)).execute();
- } catch (IOException e) {
- log.error("Could not reach endpoint at {}", restEndpointURI);
- }
- }
-}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestParameters.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestParameters.java
deleted file mode 100644
index ac675422f..000000000
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestParameters.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.bufferrest;
-
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
-
-import java.util.List;
-
-public class BufferRestParameters extends EventSinkBindingParams {
-
- private String restEndpointURI;
- private List<String> fieldsToSend;
- private int bufferSize;
-
- public BufferRestParameters(DataSinkInvocation graph, List<String>
fieldsToSend, String restEndpointURI,
- int bufferSize) {
- super(graph);
- this.fieldsToSend = fieldsToSend;
- this.restEndpointURI = restEndpointURI;
- this.bufferSize = bufferSize;
- }
-
- public List<String> getFieldsToSend() {
- return fieldsToSend;
- }
-
- public String getRestEndpointURI() {
- return restEndpointURI;
- }
-
- public int getBufferSize() {
- return bufferSize;
- }
-}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestController.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
similarity index 50%
rename from
streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestController.java
rename to
streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
index a8cd11e31..7ae978afc 100644
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestController.java
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
@@ -18,28 +18,48 @@
package org.apache.streampipes.sinks.brokers.jvm.bufferrest;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
+import
org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.BufferListener;
+import
org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.MessageBuffer;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+import org.apache.commons.io.Charsets;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.StringEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
import java.util.List;
+import java.util.Map;
+
+public class BufferRestPublisherSink extends StreamPipesDataSink implements
BufferListener {
-public class BufferRestController extends
StandaloneEventSinkDeclarer<BufferRestParameters> {
+ private static final Logger LOG =
LoggerFactory.getLogger(BufferRestPublisherSink.class);
private static final String KEY = "bufferrest";
private static final String URI = ".uri";
private static final String COUNT = ".count";
private static final String FIELDS = ".fields-to-send";
+ private List<String> fieldsToSend;
+ private SpDataFormatDefinition dataFormatDefinition;
+ private String restEndpointURI;
+ private MessageBuffer buffer;
+
@Override
public DataSinkDescription declareModel() {
return
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.bufferrest")
@@ -58,15 +78,41 @@ public class BufferRestController extends
StandaloneEventSinkDeclarer<BufferRest
}
@Override
- public ConfiguredEventSink<BufferRestParameters>
onInvocation(DataSinkInvocation graph,
-
DataSinkParameterExtractor extractor) {
+ public void onInvocation(SinkParams parameters,
+ EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
- List<String> fieldsToSend = extractor.mappingPropertyValues(KEY + FIELDS);
- String restEndpointURI = extractor.singleValueParameter(KEY + URI,
String.class);
+ var extractor = parameters.extractor();
+ fieldsToSend = extractor.mappingPropertyValues(KEY + FIELDS);
+ restEndpointURI = extractor.singleValueParameter(KEY + URI, String.class);
int bufferSize = Integer.parseInt(extractor.singleValueParameter(KEY +
COUNT, String.class));
+ this.dataFormatDefinition = new JsonDataFormatDefinition();
- BufferRestParameters params = new BufferRestParameters(graph,
fieldsToSend, restEndpointURI, bufferSize);
+ this.buffer = new MessageBuffer(bufferSize);
+ this.buffer.addListener(this);
+ }
- return new ConfiguredEventSink<>(params, BufferRest::new);
+ @Override
+ public void onEvent(Event event) throws SpRuntimeException {
+ Map<String, Object> outEventMap = event.getSubset(fieldsToSend).getRaw();
+ try {
+ String json = new String(dataFormatDefinition.fromMap(outEventMap));
+ this.buffer.addMessage(json);
+ } catch (SpRuntimeException e) {
+ LOG.error("Could not parse incoming event");
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ buffer.removeListener(this);
+ }
+
+ @Override
+ public void bufferFull(String messagesJsonArray) {
+ try {
+ Request.Post(restEndpointURI).body(new StringEntity(messagesJsonArray,
Charsets.UTF_8)).execute();
+ } catch (IOException e) {
+ LOG.error("Could not reach endpoint at {}", restEndpointURI);
+ }
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsParameters.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsParameters.java
deleted file mode 100644
index 1cc724461..000000000
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsParameters.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.jms;
-
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
-
-public class JmsParameters extends EventSinkBindingParams {
-
- private String jmsHost;
- private Integer jmsPort;
- private String topic;
-
- public JmsParameters(DataSinkInvocation graph, String jmsHost, Integer
jmsPort, String topic) {
- super(graph);
- this.jmsHost = jmsHost;
- this.jmsPort = jmsPort;
- this.topic = topic;
- }
-
- public String getJmsHost() {
- return jmsHost;
- }
-
- public Integer getJmsPort() {
- return jmsPort;
- }
-
- public String getTopic() {
- return topic;
- }
-}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java
deleted file mode 100644
index 45107dd32..000000000
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.jms;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
-import org.apache.streampipes.model.grounding.JmsTransportProtocol;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import java.util.Map;
-
-public class JmsPublisher implements EventSink<JmsParameters> {
-
- private ActiveMQPublisher publisher;
- private JsonDataFormatDefinition jsonDataFormatDefinition;
-
- public JmsPublisher() {
- this.jsonDataFormatDefinition = new JsonDataFormatDefinition();
- }
-
- @Override
- public void onInvocation(JmsParameters params, EventSinkRuntimeContext
runtimeContext) throws SpRuntimeException {
-// this.publisher = new ActiveMQPublisher(params.getJmsHost() + ":" +
params.getJmsPort(), params.getTopic());
- this.publisher = new ActiveMQPublisher();
- JmsTransportProtocol jmsTransportProtocol =
- new JmsTransportProtocol(params.getJmsHost(), params.getJmsPort(),
params.getTopic());
- this.publisher.connect(jmsTransportProtocol);
- if (!this.publisher.isConnected()) {
- throw new SpRuntimeException(
- "Could not connect to JMS server " + params.getJmsHost() + " on
Port: " + params.getJmsPort()
- + " to topic: " + params.getTopic());
- }
-
-
- }
-
- @Override
- public void onEvent(Event inputEvent) {
- try {
- Map<String, Object> event = inputEvent.getRaw();
- this.publisher.publish(jsonDataFormatDefinition.fromMap(event));
- } catch (SpRuntimeException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void onDetach() throws SpRuntimeException {
- this.publisher.disconnect();
- }
-}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
similarity index 57%
rename from
streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java
rename to
streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
index 895b49e5d..0342cd6be 100644
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
@@ -19,28 +19,33 @@
package org.apache.streampipes.sinks.brokers.jvm.jms;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
-public class JmsController extends StandaloneEventSinkDeclarer<JmsParameters> {
+import java.util.Map;
+
+public class JmsPublisherSink extends StreamPipesDataSink {
- private static final String JMS_BROKER_SETTINGS_KEY = "broker-settings";
private static final String TOPIC_KEY = "topic";
private static final String HOST_KEY = "host";
private static final String PORT_KEY = "port";
-// private static final String JMS_HOST_URI = "http://schema.org/jmsHost";
-// private static final String JMS_PORT_URI = "http://schema.org/jmsPort";
+ private ActiveMQPublisher publisher;
+ private JsonDataFormatDefinition jsonDataFormatDefinition;
@Override
public DataSinkDescription declareModel() {
@@ -55,27 +60,43 @@ public class JmsController extends
StandaloneEventSinkDeclarer<JmsParameters> {
.requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
.requiredTextParameter(Labels.withId(HOST_KEY), false, false)
.requiredIntegerParameter(Labels.withId(PORT_KEY), 61616)
-// .requiredOntologyConcept(Labels.withId(JMS_BROKER_SETTINGS_KEY),
-// OntologyProperties.mandatory(JMS_HOST_URI),
-// OntologyProperties.mandatory(JMS_PORT_URI))
.build();
}
@Override
- public ConfiguredEventSink<JmsParameters> onInvocation(DataSinkInvocation
graph,
-
DataSinkParameterExtractor extractor) {
+ public void onInvocation(SinkParams parameters,
+ EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
- String topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
+ var extractor = parameters.extractor();
+ this.jsonDataFormatDefinition = new JsonDataFormatDefinition();
String jmsHost = extractor.singleValueParameter(HOST_KEY, String.class);
Integer jmsPort = extractor.singleValueParameter(PORT_KEY, Integer.class);
-// String jmsHost =
extractor.supportedOntologyPropertyValue(JMS_BROKER_SETTINGS_KEY, JMS_HOST_URI,
-// String.class);
-// Integer jmsPort =
extractor.supportedOntologyPropertyValue(JMS_BROKER_SETTINGS_KEY, JMS_PORT_URI,
-// Integer.class);
+ String topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
+
+ this.publisher = new ActiveMQPublisher();
+ JmsTransportProtocol jmsTransportProtocol =
+ new JmsTransportProtocol(jmsHost, jmsPort, topic);
+ this.publisher.connect(jmsTransportProtocol);
+ if (!this.publisher.isConnected()) {
+ throw new SpRuntimeException(
+ "Could not connect to JMS server " + jmsHost + " on Port: " + jmsPort
+ + " to topic: " + topic);
+ }
+ }
- JmsParameters params = new JmsParameters(graph, jmsHost, jmsPort, topic);
+ @Override
+ public void onEvent(Event inputEvent) throws SpRuntimeException {
+ try {
+ Map<String, Object> event = inputEvent.getRaw();
+ this.publisher.publish(jsonDataFormatDefinition.fromMap(event));
+ } catch (SpRuntimeException e) {
+ e.printStackTrace();
+ }
+ }
- return new ConfiguredEventSink<>(params, JmsPublisher::new);
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ this.publisher.disconnect();
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsController.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsController.java
index 767fd0e11..90bc8361f 100644
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsController.java
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsController.java
@@ -18,26 +18,41 @@
package org.apache.streampipes.sinks.brokers.jvm.nats;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.messaging.nats.NatsUtils;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.nats.NatsConfig;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
import org.apache.streampipes.pe.shared.config.nats.NatsConfigUtils;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
-public class NatsController extends
StandaloneEventSinkDeclarer<NatsParameters> {
+import io.nats.client.Connection;
+import io.nats.client.Nats;
+import io.nats.client.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+public class NatsController extends StreamPipesDataSink {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(NatsController.class);
private static final String SUBJECT_KEY = "subject";
private static final String URLS_KEY = "natsUrls";
@@ -55,6 +70,10 @@ public class NatsController extends
StandaloneEventSinkDeclarer<NatsParameters>
private static final String CONNECTION_PROPERTIES_GROUP = "connection-group";
private static final String PROPERTIES_KEY = "properties";
+ private String subject;
+ private Connection natsConnection;
+ private JsonDataFormatDefinition dataFormatDefinition;
+
@Override
public DataSinkDescription declareModel() {
return
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.nats")
@@ -74,17 +93,6 @@ public class NatsController extends
StandaloneEventSinkDeclarer<NatsParameters>
.build();
}
- @Override
- public ConfiguredEventSink<NatsParameters> onInvocation(DataSinkInvocation
graph,
-
DataSinkParameterExtractor extractor) {
-
- NatsConfig natsConfig =
NatsConfigUtils.from(StaticPropertyExtractor.from(graph.getStaticProperties()));
-
- NatsParameters params = new NatsParameters(graph, natsConfig);
-
- return new ConfiguredEventSink<>(params, NatsPublisher::new);
- }
-
public static StaticPropertyAlternative getAccessModeAlternativesOne() {
return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
@@ -109,4 +117,40 @@ public class NatsController extends
StandaloneEventSinkDeclarer<NatsParameters>
StaticProperties.stringFreeTextProperty(Labels.withId(PROPERTIES_KEY))));
}
+
+ @Override
+ public void onInvocation(SinkParams parameters,
+ EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
+ this.dataFormatDefinition = new JsonDataFormatDefinition();
+ NatsConfig natsConfig = NatsConfigUtils.from(
+
StaticPropertyExtractor.from(parameters.getGraph().getStaticProperties()));
+ this.subject = natsConfig.getSubject();
+ Options options = NatsUtils.makeNatsOptions(natsConfig);
+
+ try {
+ this.natsConnection = Nats.connect(options);
+ } catch (Exception e) {
+ LOG.error("Error when connecting to the Nats broker on " +
natsConfig.getNatsUrls() + " . " + e);
+ }
+ }
+
+ @Override
+ public void onEvent(Event inputEvent) throws SpRuntimeException {
+ try {
+ Map<String, Object> event = inputEvent.getRaw();
+ natsConnection.publish(subject, dataFormatDefinition.fromMap(event));
+ } catch (SpRuntimeException e) {
+ LOG.error("Could not publish events to Nats broker. " + e);
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ try {
+ natsConnection.flush(Duration.ofMillis(50));
+ natsConnection.close();
+ } catch (TimeoutException | InterruptedException e) {
+ LOG.error("Error when disconnecting with Nats broker. " + e);
+ }
+ }
}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsParameters.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsParameters.java
deleted file mode 100644
index 4f201f746..000000000
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsParameters.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.nats;
-
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.nats.NatsConfig;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
-
-public class NatsParameters extends EventSinkBindingParams {
-
- private final NatsConfig natsConfig;
-
- public NatsParameters(DataSinkInvocation graph,
- NatsConfig natsConfig) {
- super(graph);
- this.natsConfig = natsConfig;
- }
-
- public NatsConfig getNatsConfig() {
- return natsConfig;
- }
-}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsPublisher.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsPublisher.java
deleted file mode 100644
index 41cffa4e7..000000000
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsPublisher.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.nats;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.messaging.nats.NatsUtils;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import io.nats.client.Connection;
-import io.nats.client.Nats;
-import io.nats.client.Options;
-
-import java.time.Duration;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
-public class NatsPublisher implements EventSink<NatsParameters> {
-
- private String subject;
- private Connection natsConnection;
- private JsonDataFormatDefinition dataFormatDefinition;
- private static Logger log;
-
- public NatsPublisher() {
- this.dataFormatDefinition = new JsonDataFormatDefinition();
- }
-
- @Override
- public void onInvocation(NatsParameters parameters, EventSinkRuntimeContext
runtimeContext)
- throws SpRuntimeException {
-
- log = parameters.getGraph().getLogger(NatsPublisher.class);
- var natsConfig = parameters.getNatsConfig();
- this.subject = natsConfig.getSubject();
- Options options = NatsUtils.makeNatsOptions(natsConfig);
-
- try {
- this.natsConnection = Nats.connect(options);
- } catch (Exception e) {
- log.error("Error when connecting to the Nats broker on " +
natsConfig.getNatsUrls() + " . " + e.toString());
- }
- }
-
- @Override
- public void onEvent(Event inputEvent) {
- try {
- Map<String, Object> event = inputEvent.getRaw();
- natsConnection.publish(subject, dataFormatDefinition.fromMap(event));
- } catch (SpRuntimeException e) {
- log.error("Could not publish events to Nats broker. " + e.toString());
- }
- }
-
- @Override
- public void onDetach() throws SpRuntimeException {
- try {
- natsConnection.flush(Duration.ofMillis(50));
- natsConnection.close();
- } catch (TimeoutException | InterruptedException e) {
- log.error("Error when disconnecting with Nats broker. " + e.toString());
- }
- }
-}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
deleted file mode 100644
index 0856da542..000000000
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.pe.shared.PlaceholderExtractor;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RabbitMqConsumer implements EventSink<RabbitMqParameters> {
-
- private RabbitMqPublisher publisher;
- private JsonDataFormatDefinition dataFormatDefinition;
- private String topic;
-
- private static final Logger LOG =
LoggerFactory.getLogger(RabbitMqConsumer.class);
-
- public RabbitMqConsumer() {
- this.dataFormatDefinition = new JsonDataFormatDefinition();
- }
-
- @Override
- public void onInvocation(RabbitMqParameters parameters,
EventSinkRuntimeContext runtimeContext)
- throws SpRuntimeException {
- this.publisher = new RabbitMqPublisher(parameters);
- this.topic = parameters.getRabbitMqTopic();
-
- if (!this.publisher.isConnected()) {
- throw new SpRuntimeException("Could not establish conntection to
RabbitMQ broker. Host: "
- + parameters.getRabbitMqHost() + " Port: " +
parameters.getRabbitMqPort());
- }
- }
-
- @Override
- public void onEvent(Event inputEvent) {
- try {
- publisher.fire(dataFormatDefinition.fromMap(inputEvent.getRaw()),
- PlaceholderExtractor.replacePlaceholders(inputEvent, topic));
- } catch (SpRuntimeException e) {
- LOG.error("Could not serialiaze event");
- }
- }
-
- @Override
- public void onDetach() throws SpRuntimeException {
- publisher.cleanup();
- }
-}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java
deleted file mode 100644
index b087f7114..000000000
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
-
-import org.apache.streampipes.model.DataSinkType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.sdk.builder.DataSinkBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
-
-public class RabbitMqController extends
StandaloneEventSinkDeclarer<RabbitMqParameters> {
-
- private static final String RABBITMQ_BROKER_SETTINGS_KEY = "broker-settings";
- private static final String TOPIC_KEY = "topic";
-
- private static final String HOST_KEY = "host";
- private static final String PORT_KEY = "port";
- private static final String USER_KEY = "user";
- private static final String PASSWORD_KEY = "password";
- private static final String EXCHANGE_NAME_KEY = "exchange-name";
-
-
-// private static final String RABBITMQ_HOST_URI =
"http://schema.org/rabbitMqHost";
-// private static final String RABBITMQ_PORT_URI =
"http://schema.org/rabbitMqPort";
-// private static final String RABBITMQ_USER_URI =
"http://schema.org/rabbitMqUser";
-// private static final String RABBITMQ_PASSWORD_URI =
"http://schema.org/rabbitMqPassword";
-// private static final String EXCHANGE_NAME_URI =
"http://schema.org/exchangeName";
-
- @Override
- public DataSinkDescription declareModel() {
- return
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rabbitmq")
- .category(DataSinkType.MESSAGING)
- .withLocales(Locales.EN)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
- .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
- .requiredIntegerParameter(Labels.withId(PORT_KEY), 5672)
- .requiredTextParameter(Labels.withId(USER_KEY), false, false)
- .requiredSecret(Labels.withId(PASSWORD_KEY))
-// .requiredTextParameter(Labels.withId(EXCHANGE_NAME_KEY), false,
false)
-//
.requiredOntologyConcept(Labels.withId(RABBITMQ_BROKER_SETTINGS_KEY),
-// OntologyProperties.mandatory(RABBITMQ_HOST_URI),
-// OntologyProperties.mandatory(RABBITMQ_PORT_URI),
-// OntologyProperties.mandatory(RABBITMQ_USER_URI),
-// OntologyProperties.mandatory(RABBITMQ_PASSWORD_URI),
-// OntologyProperties.optional(EXCHANGE_NAME_URI))
- .build();
- }
-
- @Override
- public ConfiguredEventSink<RabbitMqParameters>
onInvocation(DataSinkInvocation graph,
-
DataSinkParameterExtractor extractor) {
- String publisherTopic = extractor.singleValueParameter(TOPIC_KEY,
String.class);
-
-// String rabbitMqHost =
extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
RABBITMQ_HOST_URI,
-// String.class);
-// Integer rabbitMqPort =
extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
RABBITMQ_PORT_URI,
-// Integer.class);
-// String rabbitMqUser =
extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
RABBITMQ_USER_URI,
-// String.class);
-// String rabbitMqPassword =
extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
-// RABBITMQ_PASSWORD_URI,
-// String.class);
-// String exchangeName =
extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
-// EXCHANGE_NAME_URI,
-// String.class);
-
- String rabbitMqHost = extractor.singleValueParameter(HOST_KEY,
String.class);
- Integer rabbitMqPort = extractor.singleValueParameter(PORT_KEY,
Integer.class);
- String rabbitMqUser = extractor.singleValueParameter(USER_KEY,
String.class);
- String rabbitMqPassword = extractor.secretValue(PASSWORD_KEY);
-// String exchangeName = extractor.singleValueParameter(EXCHANGE_NAME_KEY,
String.class);
- String exchangeName = "logs";
-
- RabbitMqParameters params = new RabbitMqParameters(graph, rabbitMqHost,
rabbitMqPort, publisherTopic,
- rabbitMqUser, rabbitMqPassword, exchangeName);
-
- return new ConfiguredEventSink<>(params, RabbitMqConsumer::new);
-
-
-
- }
-}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqParameters.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqParameters.java
index 865737897..fda4129ce 100644
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqParameters.java
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqParameters.java
@@ -18,10 +18,7 @@
package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
-
-public class RabbitMqParameters extends EventSinkBindingParams {
+public class RabbitMqParameters {
private String rabbitMqHost;
private Integer rabbitMqPort;
@@ -30,9 +27,8 @@ public class RabbitMqParameters extends
EventSinkBindingParams {
private String rabbitMqPassword;
private String exchangeName;
- public RabbitMqParameters(DataSinkInvocation graph, String rabbitMqHost,
Integer rabbitMqPort, String rabbitMqTopic,
+ public RabbitMqParameters(String rabbitMqHost, Integer rabbitMqPort, String
rabbitMqTopic,
String rabbitMqUser, String rabbitMqPassword,
String exchangeName) {
- super(graph);
this.rabbitMqHost = rabbitMqHost;
this.rabbitMqPort = rabbitMqPort;
this.rabbitMqTopic = rabbitMqTopic;
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
new file mode 100644
index 000000000..4a603cb88
--- /dev/null
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.pe.shared.PlaceholderExtractor;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RabbitMqPublisherSink extends StreamPipesDataSink {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RabbitMqPublisherSink.class);
+ private static final String TOPIC_KEY = "topic";
+
+ private static final String HOST_KEY = "host";
+ private static final String PORT_KEY = "port";
+ private static final String USER_KEY = "user";
+ private static final String PASSWORD_KEY = "password";
+ private static final String EXCHANGE_NAME_KEY = "exchange-name";
+
+ private RabbitMqPublisher publisher;
+ private JsonDataFormatDefinition dataFormatDefinition;
+ private String topic;
+
+
+ @Override
+ public DataSinkDescription declareModel() {
+ return
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rabbitmq")
+ .category(DataSinkType.MESSAGING)
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
+ .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+ .requiredIntegerParameter(Labels.withId(PORT_KEY), 5672)
+ .requiredTextParameter(Labels.withId(USER_KEY), false, false)
+ .requiredTextParameter(Labels.withId(EXCHANGE_NAME_KEY), false, false)
+ .requiredSecret(Labels.withId(PASSWORD_KEY))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(SinkParams parameters,
+ EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
+
+ var extractor = parameters.extractor();
+ this.dataFormatDefinition = new JsonDataFormatDefinition();
+ String publisherTopic = extractor.singleValueParameter(TOPIC_KEY,
String.class);
+
+ String rabbitMqHost = extractor.singleValueParameter(HOST_KEY,
String.class);
+ Integer rabbitMqPort = extractor.singleValueParameter(PORT_KEY,
Integer.class);
+ String rabbitMqUser = extractor.singleValueParameter(USER_KEY,
String.class);
+ String rabbitMqPassword = extractor.secretValue(PASSWORD_KEY);
+ String exchangeName = extractor.singleValueParameter(EXCHANGE_NAME_KEY,
String.class);
+
+ var rabbitMqParameters = new RabbitMqParameters(rabbitMqHost,
rabbitMqPort, publisherTopic,
+ rabbitMqUser, rabbitMqPassword, exchangeName);
+
+ this.publisher = new RabbitMqPublisher(rabbitMqParameters);
+ this.topic = rabbitMqParameters.getRabbitMqTopic();
+
+ if (!this.publisher.isConnected()) {
+ throw new SpRuntimeException("Could not establish conntection to
RabbitMQ broker. Host: "
+ + rabbitMqParameters.getRabbitMqHost() + " Port: " +
rabbitMqParameters.getRabbitMqPort());
+ }
+ }
+
+ @Override
+ public void onEvent(Event inputEvent) throws SpRuntimeException {
+ try {
+ publisher.fire(dataFormatDefinition.fromMap(inputEvent.getRaw()),
+ PlaceholderExtractor.replacePlaceholders(inputEvent, topic));
+ } catch (SpRuntimeException e) {
+ LOG.error("Could not serialiaze event");
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ publisher.cleanup();
+ }
+}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestParameters.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestParameters.java
deleted file mode 100644
index 6dd2c14fd..000000000
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestParameters.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.rest;
-
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
-
-public class RestParameters extends EventSinkBindingParams {
-
- private String url;
-
- public RestParameters(DataSinkInvocation graph, String url) {
- super(graph);
- this.url = url;
- }
-
- public String getUrl() {
- return url;
- }
-}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestPublisher.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestPublisher.java
deleted file mode 100644
index 73cf829b5..000000000
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestPublisher.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.rest;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
-
-import java.io.IOException;
-
-public class RestPublisher implements EventSink<RestParameters> {
- private static Logger logger;
-
- private String url;
- private JsonDataFormatDefinition jsonDataFormatDefinition;
-
- @Override
- public void onInvocation(RestParameters params, EventSinkRuntimeContext
runtimeContext) throws SpRuntimeException {
- this.url = params.getUrl();
- logger = params.getGraph().getLogger(RestPublisher.class);
- jsonDataFormatDefinition = new JsonDataFormatDefinition();
- }
-
- @Override
- public void onEvent(Event inputEvent) {
-
- byte[] json = null;
- try {
- json = jsonDataFormatDefinition.fromMap(inputEvent.getRaw());
- } catch (SpRuntimeException e) {
- logger.error("Error while serializing event: " +
inputEvent.getSourceInfo().getSourceId() + " Exception: "
- + e);
- }
-
- try {
- Request.Post(url)
- .bodyByteArray(json, ContentType.APPLICATION_JSON)
- .connectTimeout(1000)
- .socketTimeout(100000)
- .execute().returnContent().asString();
- } catch (IOException e) {
- logger.error("Error while sending data to endpoint: " + url + "
Exception: " + e);
- }
- }
-
- @Override
- public void onDetach() throws SpRuntimeException {
- }
-}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestController.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
similarity index 51%
rename from
streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestController.java
rename to
streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
index c6ccd48d5..583b04fca 100644
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestController.java
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
@@ -19,23 +19,37 @@
package org.apache.streampipes.sinks.brokers.jvm.rest;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
-public class RestController extends
StandaloneEventSinkDeclarer<RestParameters> {
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class RestSink extends StreamPipesDataSink {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RestSink.class);
private static final String URL_KEY = "url-key";
+ private String url;
+ private JsonDataFormatDefinition jsonDataFormatDefinition;
+
@Override
public DataSinkDescription declareModel() {
return
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rest")
@@ -52,13 +66,35 @@ public class RestController extends
StandaloneEventSinkDeclarer<RestParameters>
}
@Override
- public ConfiguredEventSink<RestParameters> onInvocation(DataSinkInvocation
graph,
-
DataSinkParameterExtractor extractor) {
+ public void onDetach() throws SpRuntimeException {
- String url = extractor.singleValueParameter(URL_KEY, String.class);
+ }
- RestParameters params = new RestParameters(graph, url);
+ @Override
+ public void onInvocation(SinkParams parameters,
+ EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
+ jsonDataFormatDefinition = new JsonDataFormatDefinition();
+ url = parameters.extractor().singleValueParameter(URL_KEY, String.class);
+ }
+
+ @Override
+ public void onEvent(Event inputEvent) throws SpRuntimeException {
+ byte[] json = null;
+ try {
+ json = jsonDataFormatDefinition.fromMap(inputEvent.getRaw());
+ } catch (SpRuntimeException e) {
+ LOG.error("Error while serializing event: " +
inputEvent.getSourceInfo().getSourceId() + " Exception: "
+ + e);
+ }
- return new ConfiguredEventSink<>(params, RestPublisher::new);
+ try {
+ Request.Post(url)
+ .bodyByteArray(json, ContentType.APPLICATION_JSON)
+ .connectTimeout(1000)
+ .socketTimeout(100000)
+ .execute().returnContent().asString();
+ } catch (IOException e) {
+ LOG.error("Error while sending data to endpoint: " + url + " Exception:
" + e);
+ }
}
}