This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch 1614-migrate-data-sinks-in-brokers-jvm-module
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/1614-migrate-data-sinks-in-brokers-jvm-module by this push:
     new 42617e3ba Migrate data sinks in brokers-jvm module (#1614)
42617e3ba is described below

commit 42617e3ba29a25e288486d5ca3d6adeb60e9d735
Author: Dominik Riemer <[email protected]>
AuthorDate: Fri May 26 17:23:09 2023 +0200

    Migrate data sinks in brokers-jvm module (#1614)
---
 .../sinks/brokers/jvm/BrokersJvmInit.java          |  16 +--
 .../sinks/brokers/jvm/bufferrest/BufferRest.java   |  84 ---------------
 .../jvm/bufferrest/BufferRestParameters.java       |  51 ---------
 ...ontroller.java => BufferRestPublisherSink.java} |  68 ++++++++++--
 .../sinks/brokers/jvm/jms/JmsParameters.java       |  48 ---------
 .../sinks/brokers/jvm/jms/JmsPublisher.java        |  70 -------------
 .../{JmsController.java => JmsPublisherSink.java}  |  61 +++++++----
 .../sinks/brokers/jvm/nats/NatsController.java     |  76 +++++++++++---
 .../sinks/brokers/jvm/nats/NatsParameters.java     |  38 -------
 .../sinks/brokers/jvm/nats/NatsPublisher.java      |  83 ---------------
 .../brokers/jvm/rabbitmq/RabbitMqConsumer.java     |  69 -------------
 .../brokers/jvm/rabbitmq/RabbitMqController.java   | 110 --------------------
 .../brokers/jvm/rabbitmq/RabbitMqParameters.java   |   8 +-
 .../jvm/rabbitmq/RabbitMqPublisherSink.java        | 115 +++++++++++++++++++++
 .../sinks/brokers/jvm/rest/RestParameters.java     |  36 -------
 .../sinks/brokers/jvm/rest/RestPublisher.java      |  71 -------------
 .../rest/{RestController.java => RestSink.java}    |  56 ++++++++--
 17 files changed, 329 insertions(+), 731 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
index 1e07fc515..f3920b0e6 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
@@ -28,14 +28,14 @@ import 
org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
-import 
org.apache.streampipes.sinks.brokers.jvm.bufferrest.BufferRestController;
-import org.apache.streampipes.sinks.brokers.jvm.jms.JmsController;
+import 
org.apache.streampipes.sinks.brokers.jvm.bufferrest.BufferRestPublisherSink;
+import org.apache.streampipes.sinks.brokers.jvm.jms.JmsPublisherSink;
 import org.apache.streampipes.sinks.brokers.jvm.kafka.KafkaPublishSink;
 import org.apache.streampipes.sinks.brokers.jvm.mqtt.MqttPublisherSink;
 import org.apache.streampipes.sinks.brokers.jvm.nats.NatsController;
 import org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink;
-import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqController;
-import org.apache.streampipes.sinks.brokers.jvm.rest.RestController;
+import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqPublisherSink;
+import org.apache.streampipes.sinks.brokers.jvm.rest.RestSink;
 import org.apache.streampipes.sinks.brokers.jvm.rocketmq.RocketMQPublisherSink;
 import org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink;
 import org.apache.streampipes.sinks.brokers.jvm.websocket.WebsocketServerSink;
@@ -54,10 +54,10 @@ public class BrokersJvmInit extends 
ExtensionsModelSubmitter {
             8096)
         .registerPipelineElements(
             new KafkaPublishSink(),
-            new JmsController(),
-            new RestController(),
-            new BufferRestController(),
-            new RabbitMqController(),
+            new JmsPublisherSink(),
+            new RestSink(),
+            new BufferRestPublisherSink(),
+            new RabbitMqPublisherSink(),
             new MqttPublisherSink(),
             new WebsocketServerSink(),
             new PulsarPublisherSink(),
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRest.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRest.java
deleted file mode 100644
index 20ff4490f..000000000
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.bufferrest;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.SpDataFormatDefinition;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.model.runtime.Event;
-import 
org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.BufferListener;
-import 
org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.MessageBuffer;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import org.apache.commons.io.Charsets;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.StringEntity;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-
-public class BufferRest implements EventSink<BufferRestParameters>, 
BufferListener {
-
-  private static Logger log;
-  private List<String> fieldsToSend;
-  private SpDataFormatDefinition dataFormatDefinition;
-  private String restEndpointURI;
-  private MessageBuffer buffer;
-
-  public BufferRest() {
-    this.dataFormatDefinition = new JsonDataFormatDefinition();
-  }
-
-  @Override
-  public void onInvocation(BufferRestParameters parameters, 
EventSinkRuntimeContext runtimeContext) {
-    this.fieldsToSend = parameters.getFieldsToSend();
-    this.restEndpointURI = parameters.getRestEndpointURI();
-    this.buffer = new MessageBuffer(parameters.getBufferSize());
-    this.buffer.addListener(this);
-  }
-
-  @Override
-  public void onEvent(Event event) {
-    Map<String, Object> outEventMap = event.getSubset(fieldsToSend).getRaw();
-    try {
-      String json = new String(dataFormatDefinition.fromMap(outEventMap));
-      this.buffer.addMessage(json);
-    } catch (SpRuntimeException e) {
-      log.error("Could not parse incoming event");
-    }
-  }
-
-  @Override
-  public void onDetach() {
-    buffer.removeListener(this);
-  }
-
-  @Override
-  public void bufferFull(String messagesJsonArray) {
-    try {
-      Request.Post(restEndpointURI).body(new StringEntity(messagesJsonArray, 
Charsets.UTF_8)).execute();
-    } catch (IOException e) {
-      log.error("Could not reach endpoint at {}", restEndpointURI);
-    }
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestParameters.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestParameters.java
deleted file mode 100644
index ac675422f..000000000
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestParameters.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.bufferrest;
-
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
-
-import java.util.List;
-
-public class BufferRestParameters extends EventSinkBindingParams {
-
-  private String restEndpointURI;
-  private List<String> fieldsToSend;
-  private int bufferSize;
-
-  public BufferRestParameters(DataSinkInvocation graph, List<String> 
fieldsToSend, String restEndpointURI,
-                              int bufferSize) {
-    super(graph);
-    this.fieldsToSend = fieldsToSend;
-    this.restEndpointURI = restEndpointURI;
-    this.bufferSize = bufferSize;
-  }
-
-  public List<String> getFieldsToSend() {
-    return fieldsToSend;
-  }
-
-  public String getRestEndpointURI() {
-    return restEndpointURI;
-  }
-
-  public int getBufferSize() {
-    return bufferSize;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestController.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
similarity index 50%
rename from 
streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestController.java
rename to 
streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
index a8cd11e31..7ae978afc 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestController.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
@@ -18,28 +18,48 @@
 
 package org.apache.streampipes.sinks.brokers.jvm.bufferrest;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
+import 
org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.BufferListener;
+import 
org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.MessageBuffer;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
+import org.apache.commons.io.Charsets;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.StringEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 import java.util.List;
+import java.util.Map;
+
+public class BufferRestPublisherSink extends StreamPipesDataSink implements 
BufferListener {
 
-public class BufferRestController extends 
StandaloneEventSinkDeclarer<BufferRestParameters> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BufferRestPublisherSink.class);
 
   private static final String KEY = "bufferrest";
   private static final String URI = ".uri";
   private static final String COUNT = ".count";
   private static final String FIELDS = ".fields-to-send";
 
+  private List<String> fieldsToSend;
+  private SpDataFormatDefinition dataFormatDefinition;
+  private String restEndpointURI;
+  private MessageBuffer buffer;
+
   @Override
   public DataSinkDescription declareModel() {
     return 
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.bufferrest")
@@ -58,15 +78,41 @@ public class BufferRestController extends 
StandaloneEventSinkDeclarer<BufferRest
   }
 
   @Override
-  public ConfiguredEventSink<BufferRestParameters> 
onInvocation(DataSinkInvocation graph,
-                                                                
DataSinkParameterExtractor extractor) {
+  public void onInvocation(SinkParams parameters,
+                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
 
-    List<String> fieldsToSend = extractor.mappingPropertyValues(KEY + FIELDS);
-    String restEndpointURI = extractor.singleValueParameter(KEY + URI, 
String.class);
+    var extractor = parameters.extractor();
+    fieldsToSend = extractor.mappingPropertyValues(KEY + FIELDS);
+    restEndpointURI = extractor.singleValueParameter(KEY + URI, String.class);
     int bufferSize = Integer.parseInt(extractor.singleValueParameter(KEY + 
COUNT, String.class));
+    this.dataFormatDefinition = new JsonDataFormatDefinition();
 
-    BufferRestParameters params = new BufferRestParameters(graph, 
fieldsToSend, restEndpointURI, bufferSize);
+    this.buffer = new MessageBuffer(bufferSize);
+    this.buffer.addListener(this);
+  }
 
-    return new ConfiguredEventSink<>(params, BufferRest::new);
+  @Override
+  public void onEvent(Event event) throws SpRuntimeException {
+    Map<String, Object> outEventMap = event.getSubset(fieldsToSend).getRaw();
+    try {
+      String json = new String(dataFormatDefinition.fromMap(outEventMap));
+      this.buffer.addMessage(json);
+    } catch (SpRuntimeException e) {
+      LOG.error("Could not parse incoming event");
+    }
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+    buffer.removeListener(this);
+  }
+
+  @Override
+  public void bufferFull(String messagesJsonArray) {
+    try {
+      Request.Post(restEndpointURI).body(new StringEntity(messagesJsonArray, 
Charsets.UTF_8)).execute();
+    } catch (IOException e) {
+      LOG.error("Could not reach endpoint at {}", restEndpointURI);
+    }
   }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsParameters.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsParameters.java
deleted file mode 100644
index 1cc724461..000000000
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsParameters.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.jms;
-
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
-
-public class JmsParameters extends EventSinkBindingParams {
-
-  private String jmsHost;
-  private Integer jmsPort;
-  private String topic;
-
-  public JmsParameters(DataSinkInvocation graph, String jmsHost, Integer 
jmsPort, String topic) {
-    super(graph);
-    this.jmsHost = jmsHost;
-    this.jmsPort = jmsPort;
-    this.topic = topic;
-  }
-
-  public String getJmsHost() {
-    return jmsHost;
-  }
-
-  public Integer getJmsPort() {
-    return jmsPort;
-  }
-
-  public String getTopic() {
-    return topic;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java
deleted file mode 100644
index 45107dd32..000000000
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.jms;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
-import org.apache.streampipes.model.grounding.JmsTransportProtocol;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import java.util.Map;
-
-public class JmsPublisher implements EventSink<JmsParameters> {
-
-  private ActiveMQPublisher publisher;
-  private JsonDataFormatDefinition jsonDataFormatDefinition;
-
-  public JmsPublisher() {
-    this.jsonDataFormatDefinition = new JsonDataFormatDefinition();
-  }
-
-  @Override
-  public void onInvocation(JmsParameters params, EventSinkRuntimeContext 
runtimeContext) throws SpRuntimeException {
-//    this.publisher = new ActiveMQPublisher(params.getJmsHost() + ":" + 
params.getJmsPort(), params.getTopic());
-    this.publisher = new ActiveMQPublisher();
-    JmsTransportProtocol jmsTransportProtocol =
-        new JmsTransportProtocol(params.getJmsHost(), params.getJmsPort(), 
params.getTopic());
-    this.publisher.connect(jmsTransportProtocol);
-    if (!this.publisher.isConnected()) {
-      throw new SpRuntimeException(
-          "Could not connect to JMS server " + params.getJmsHost() + " on 
Port: " + params.getJmsPort()
-              + " to topic: " + params.getTopic());
-    }
-
-
-  }
-
-  @Override
-  public void onEvent(Event inputEvent) {
-    try {
-      Map<String, Object> event = inputEvent.getRaw();
-      this.publisher.publish(jsonDataFormatDefinition.fromMap(event));
-    } catch (SpRuntimeException e) {
-      e.printStackTrace();
-    }
-  }
-
-  @Override
-  public void onDetach() throws SpRuntimeException {
-    this.publisher.disconnect();
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
similarity index 57%
rename from 
streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java
rename to 
streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
index 895b49e5d..0342cd6be 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
@@ -19,28 +19,33 @@
 package org.apache.streampipes.sinks.brokers.jvm.jms;
 
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
+import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
-public class JmsController extends StandaloneEventSinkDeclarer<JmsParameters> {
+import java.util.Map;
+
+public class JmsPublisherSink extends StreamPipesDataSink {
 
-  private static final String JMS_BROKER_SETTINGS_KEY = "broker-settings";
   private static final String TOPIC_KEY = "topic";
   private static final String HOST_KEY = "host";
   private static final String PORT_KEY = "port";
 
-//  private static final String JMS_HOST_URI = "http://schema.org/jmsHost";;
-//  private static final String JMS_PORT_URI = "http://schema.org/jmsPort";;
+  private ActiveMQPublisher publisher;
+  private JsonDataFormatDefinition jsonDataFormatDefinition;
 
   @Override
   public DataSinkDescription declareModel() {
@@ -55,27 +60,43 @@ public class JmsController extends 
StandaloneEventSinkDeclarer<JmsParameters> {
         .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
         .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
         .requiredIntegerParameter(Labels.withId(PORT_KEY), 61616)
-//            .requiredOntologyConcept(Labels.withId(JMS_BROKER_SETTINGS_KEY),
-//                    OntologyProperties.mandatory(JMS_HOST_URI),
-//                    OntologyProperties.mandatory(JMS_PORT_URI))
         .build();
   }
 
   @Override
-  public ConfiguredEventSink<JmsParameters> onInvocation(DataSinkInvocation 
graph,
-                                                         
DataSinkParameterExtractor extractor) {
+  public void onInvocation(SinkParams parameters,
+                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
 
-    String topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
+    var extractor = parameters.extractor();
+    this.jsonDataFormatDefinition = new JsonDataFormatDefinition();
 
     String jmsHost = extractor.singleValueParameter(HOST_KEY, String.class);
     Integer jmsPort = extractor.singleValueParameter(PORT_KEY, Integer.class);
-//    String jmsHost = 
extractor.supportedOntologyPropertyValue(JMS_BROKER_SETTINGS_KEY, JMS_HOST_URI,
-//            String.class);
-//    Integer jmsPort = 
extractor.supportedOntologyPropertyValue(JMS_BROKER_SETTINGS_KEY, JMS_PORT_URI,
-//            Integer.class);
+    String topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
+
+    this.publisher = new ActiveMQPublisher();
+    JmsTransportProtocol jmsTransportProtocol =
+        new JmsTransportProtocol(jmsHost, jmsPort, topic);
+    this.publisher.connect(jmsTransportProtocol);
+    if (!this.publisher.isConnected()) {
+      throw new SpRuntimeException(
+          "Could not connect to JMS server " + jmsHost + " on Port: " + jmsPort
+              + " to topic: " + topic);
+    }
+  }
 
-    JmsParameters params = new JmsParameters(graph, jmsHost, jmsPort, topic);
+  @Override
+  public void onEvent(Event inputEvent) throws SpRuntimeException {
+    try {
+      Map<String, Object> event = inputEvent.getRaw();
+      this.publisher.publish(jsonDataFormatDefinition.fromMap(event));
+    } catch (SpRuntimeException e) {
+      e.printStackTrace();
+    }
+  }
 
-    return new ConfiguredEventSink<>(params, JmsPublisher::new);
+  @Override
+  public void onDetach() throws SpRuntimeException {
+    this.publisher.disconnect();
   }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsController.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsController.java
index 767fd0e11..90bc8361f 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsController.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsController.java
@@ -18,26 +18,41 @@
 
 package org.apache.streampipes.sinks.brokers.jvm.nats;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.messaging.nats.NatsUtils;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.nats.NatsConfig;
+import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
 import org.apache.streampipes.pe.shared.config.nats.NatsConfigUtils;
 import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
-public class NatsController extends 
StandaloneEventSinkDeclarer<NatsParameters> {
+import io.nats.client.Connection;
+import io.nats.client.Nats;
+import io.nats.client.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+public class NatsController extends StreamPipesDataSink {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(NatsController.class);
 
   private static final String SUBJECT_KEY = "subject";
   private static final String URLS_KEY = "natsUrls";
@@ -55,6 +70,10 @@ public class NatsController extends 
StandaloneEventSinkDeclarer<NatsParameters>
   private static final String CONNECTION_PROPERTIES_GROUP = "connection-group";
   private static final String PROPERTIES_KEY = "properties";
 
+  private String subject;
+  private Connection natsConnection;
+  private JsonDataFormatDefinition dataFormatDefinition;
+
   @Override
   public DataSinkDescription declareModel() {
     return 
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.nats")
@@ -74,17 +93,6 @@ public class NatsController extends 
StandaloneEventSinkDeclarer<NatsParameters>
         .build();
   }
 
-  @Override
-  public ConfiguredEventSink<NatsParameters> onInvocation(DataSinkInvocation 
graph,
-                                                          
DataSinkParameterExtractor extractor) {
-
-    NatsConfig natsConfig = 
NatsConfigUtils.from(StaticPropertyExtractor.from(graph.getStaticProperties()));
-
-    NatsParameters params = new NatsParameters(graph, natsConfig);
-
-    return new ConfiguredEventSink<>(params, NatsPublisher::new);
-  }
-
   public static StaticPropertyAlternative getAccessModeAlternativesOne() {
     return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
 
@@ -109,4 +117,40 @@ public class NatsController extends 
StandaloneEventSinkDeclarer<NatsParameters>
             
StaticProperties.stringFreeTextProperty(Labels.withId(PROPERTIES_KEY))));
 
   }
+
+  @Override
+  public void onInvocation(SinkParams parameters,
+                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
+    this.dataFormatDefinition = new JsonDataFormatDefinition();
+    NatsConfig natsConfig = NatsConfigUtils.from(
+        
StaticPropertyExtractor.from(parameters.getGraph().getStaticProperties()));
+    this.subject = natsConfig.getSubject();
+    Options options = NatsUtils.makeNatsOptions(natsConfig);
+
+    try {
+      this.natsConnection = Nats.connect(options);
+    } catch (Exception e) {
+      LOG.error("Error when connecting to the Nats broker on " + 
natsConfig.getNatsUrls() + " . " + e);
+    }
+  }
+
+  @Override
+  public void onEvent(Event inputEvent) throws SpRuntimeException {
+    try {
+      Map<String, Object> event = inputEvent.getRaw();
+      natsConnection.publish(subject, dataFormatDefinition.fromMap(event));
+    } catch (SpRuntimeException e) {
+      LOG.error("Could not publish events to Nats broker. " + e);
+    }
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+    try {
+      natsConnection.flush(Duration.ofMillis(50));
+      natsConnection.close();
+    } catch (TimeoutException | InterruptedException e) {
+      LOG.error("Error when disconnecting with Nats broker. " + e);
+    }
+  }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsParameters.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsParameters.java
deleted file mode 100644
index 4f201f746..000000000
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsParameters.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.nats;
-
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.nats.NatsConfig;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
-
-public class NatsParameters extends EventSinkBindingParams {
-
-  private final NatsConfig natsConfig;
-
-  public NatsParameters(DataSinkInvocation graph,
-                        NatsConfig natsConfig) {
-    super(graph);
-    this.natsConfig = natsConfig;
-  }
-
-  public NatsConfig getNatsConfig() {
-    return natsConfig;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsPublisher.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsPublisher.java
deleted file mode 100644
index 41cffa4e7..000000000
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/nats/NatsPublisher.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.nats;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.messaging.nats.NatsUtils;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import io.nats.client.Connection;
-import io.nats.client.Nats;
-import io.nats.client.Options;
-
-import java.time.Duration;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
-public class NatsPublisher implements EventSink<NatsParameters> {
-
-  private String subject;
-  private Connection natsConnection;
-  private JsonDataFormatDefinition dataFormatDefinition;
-  private static Logger log;
-
-  public NatsPublisher() {
-    this.dataFormatDefinition = new JsonDataFormatDefinition();
-  }
-
-  @Override
-  public void onInvocation(NatsParameters parameters, EventSinkRuntimeContext 
runtimeContext)
-      throws SpRuntimeException {
-
-    log = parameters.getGraph().getLogger(NatsPublisher.class);
-    var natsConfig = parameters.getNatsConfig();
-    this.subject = natsConfig.getSubject();
-    Options options = NatsUtils.makeNatsOptions(natsConfig);
-
-    try {
-      this.natsConnection = Nats.connect(options);
-    } catch (Exception e) {
-      log.error("Error when connecting to the Nats broker on " + 
natsConfig.getNatsUrls() + " . " + e.toString());
-    }
-  }
-
-  @Override
-  public void onEvent(Event inputEvent) {
-    try {
-      Map<String, Object> event = inputEvent.getRaw();
-      natsConnection.publish(subject, dataFormatDefinition.fromMap(event));
-    } catch (SpRuntimeException e) {
-      log.error("Could not publish events to Nats broker. " + e.toString());
-    }
-  }
-
-  @Override
-  public void onDetach() throws SpRuntimeException {
-    try {
-      natsConnection.flush(Duration.ofMillis(50));
-      natsConnection.close();
-    } catch (TimeoutException | InterruptedException e) {
-      log.error("Error when disconnecting with Nats broker. " + e.toString());
-    }
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
deleted file mode 100644
index 0856da542..000000000
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.pe.shared.PlaceholderExtractor;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RabbitMqConsumer implements EventSink<RabbitMqParameters> {
-
-  private RabbitMqPublisher publisher;
-  private JsonDataFormatDefinition dataFormatDefinition;
-  private String topic;
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMqConsumer.class);
-
-  public RabbitMqConsumer() {
-    this.dataFormatDefinition = new JsonDataFormatDefinition();
-  }
-
-  @Override
-  public void onInvocation(RabbitMqParameters parameters, 
EventSinkRuntimeContext runtimeContext)
-      throws SpRuntimeException {
-    this.publisher = new RabbitMqPublisher(parameters);
-    this.topic = parameters.getRabbitMqTopic();
-
-    if (!this.publisher.isConnected()) {
-      throw new SpRuntimeException("Could not establish conntection to 
RabbitMQ broker. Host: "
-          + parameters.getRabbitMqHost() + " Port: " + 
parameters.getRabbitMqPort());
-    }
-  }
-
-  @Override
-  public void onEvent(Event inputEvent) {
-    try {
-      publisher.fire(dataFormatDefinition.fromMap(inputEvent.getRaw()),
-          PlaceholderExtractor.replacePlaceholders(inputEvent, topic));
-    } catch (SpRuntimeException e) {
-      LOG.error("Could not serialiaze event");
-    }
-  }
-
-  @Override
-  public void onDetach() throws SpRuntimeException {
-    publisher.cleanup();
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java
deleted file mode 100644
index b087f7114..000000000
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
-
-import org.apache.streampipes.model.DataSinkType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.sdk.builder.DataSinkBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
-
-public class RabbitMqController extends 
StandaloneEventSinkDeclarer<RabbitMqParameters> {
-
-  private static final String RABBITMQ_BROKER_SETTINGS_KEY = "broker-settings";
-  private static final String TOPIC_KEY = "topic";
-
-  private static final String HOST_KEY = "host";
-  private static final String PORT_KEY = "port";
-  private static final String USER_KEY = "user";
-  private static final String PASSWORD_KEY = "password";
-  private static final String EXCHANGE_NAME_KEY = "exchange-name";
-
-
-//  private static final String RABBITMQ_HOST_URI = 
"http://schema.org/rabbitMqHost";;
-//  private static final String RABBITMQ_PORT_URI = 
"http://schema.org/rabbitMqPort";;
-//  private static final String RABBITMQ_USER_URI = 
"http://schema.org/rabbitMqUser";;
-//  private static final String RABBITMQ_PASSWORD_URI = 
"http://schema.org/rabbitMqPassword";;
-//  private static final String EXCHANGE_NAME_URI = 
"http://schema.org/exchangeName";;
-
-  @Override
-  public DataSinkDescription declareModel() {
-    return 
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rabbitmq")
-            .category(DataSinkType.MESSAGING)
-            .withLocales(Locales.EN)
-            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-            .requiredStream(StreamRequirementsBuilder
-                    .create()
-                    .requiredProperty(EpRequirements.anyProperty())
-                    .build())
-            .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
-            .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
-            .requiredIntegerParameter(Labels.withId(PORT_KEY), 5672)
-            .requiredTextParameter(Labels.withId(USER_KEY), false, false)
-            .requiredSecret(Labels.withId(PASSWORD_KEY))
-//            .requiredTextParameter(Labels.withId(EXCHANGE_NAME_KEY), false, 
false)
-//            
.requiredOntologyConcept(Labels.withId(RABBITMQ_BROKER_SETTINGS_KEY),
-//                    OntologyProperties.mandatory(RABBITMQ_HOST_URI),
-//                    OntologyProperties.mandatory(RABBITMQ_PORT_URI),
-//                    OntologyProperties.mandatory(RABBITMQ_USER_URI),
-//                    OntologyProperties.mandatory(RABBITMQ_PASSWORD_URI),
-//                    OntologyProperties.optional(EXCHANGE_NAME_URI))
-            .build();
-  }
-
-  @Override
-  public ConfiguredEventSink<RabbitMqParameters> 
onInvocation(DataSinkInvocation graph,
-                                                              
DataSinkParameterExtractor extractor) {
-    String publisherTopic = extractor.singleValueParameter(TOPIC_KEY, 
String.class);
-
-//    String rabbitMqHost = 
extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, 
RABBITMQ_HOST_URI,
-//            String.class);
-//    Integer rabbitMqPort = 
extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, 
RABBITMQ_PORT_URI,
-//            Integer.class);
-//    String rabbitMqUser = 
extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, 
RABBITMQ_USER_URI,
-//            String.class);
-//    String rabbitMqPassword = 
extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
-//            RABBITMQ_PASSWORD_URI,
-//            String.class);
-//    String exchangeName = 
extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
-//            EXCHANGE_NAME_URI,
-//            String.class);
-
-    String rabbitMqHost = extractor.singleValueParameter(HOST_KEY, 
String.class);
-    Integer rabbitMqPort = extractor.singleValueParameter(PORT_KEY, 
Integer.class);
-    String rabbitMqUser = extractor.singleValueParameter(USER_KEY, 
String.class);
-    String rabbitMqPassword = extractor.secretValue(PASSWORD_KEY);
-//    String exchangeName = extractor.singleValueParameter(EXCHANGE_NAME_KEY, 
String.class);
-    String exchangeName = "logs";
-
-    RabbitMqParameters params = new RabbitMqParameters(graph, rabbitMqHost, 
rabbitMqPort, publisherTopic,
-            rabbitMqUser, rabbitMqPassword, exchangeName);
-
-    return new ConfiguredEventSink<>(params, RabbitMqConsumer::new);
-
-
-
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqParameters.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqParameters.java
index 865737897..fda4129ce 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqParameters.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqParameters.java
@@ -18,10 +18,7 @@
 
 package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
-
-public class RabbitMqParameters extends EventSinkBindingParams {
+public class RabbitMqParameters {
 
   private String rabbitMqHost;
   private Integer rabbitMqPort;
@@ -30,9 +27,8 @@ public class RabbitMqParameters extends 
EventSinkBindingParams {
   private String rabbitMqPassword;
   private String exchangeName;
 
-  public RabbitMqParameters(DataSinkInvocation graph, String rabbitMqHost, 
Integer rabbitMqPort, String rabbitMqTopic,
+  public RabbitMqParameters(String rabbitMqHost, Integer rabbitMqPort, String 
rabbitMqTopic,
                             String rabbitMqUser, String rabbitMqPassword, 
String exchangeName) {
-    super(graph);
     this.rabbitMqHost = rabbitMqHost;
     this.rabbitMqPort = rabbitMqPort;
     this.rabbitMqTopic = rabbitMqTopic;
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
new file mode 100644
index 000000000..4a603cb88
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.pe.shared.PlaceholderExtractor;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RabbitMqPublisherSink extends StreamPipesDataSink {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMqPublisherSink.class);
+  private static final String TOPIC_KEY = "topic";
+
+  private static final String HOST_KEY = "host";
+  private static final String PORT_KEY = "port";
+  private static final String USER_KEY = "user";
+  private static final String PASSWORD_KEY = "password";
+  private static final String EXCHANGE_NAME_KEY = "exchange-name";
+
+  private RabbitMqPublisher publisher;
+  private JsonDataFormatDefinition dataFormatDefinition;
+  private String topic;
+
+
+  @Override
+  public DataSinkDescription declareModel() {
+    return 
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rabbitmq")
+        .category(DataSinkType.MESSAGING)
+        .withLocales(Locales.EN)
+        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+        .requiredStream(StreamRequirementsBuilder
+            .create()
+            .requiredProperty(EpRequirements.anyProperty())
+            .build())
+        .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
+        .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+        .requiredIntegerParameter(Labels.withId(PORT_KEY), 5672)
+        .requiredTextParameter(Labels.withId(USER_KEY), false, false)
+        .requiredTextParameter(Labels.withId(EXCHANGE_NAME_KEY), false, false)
+        .requiredSecret(Labels.withId(PASSWORD_KEY))
+        .build();
+  }
+
+  @Override
+  public void onInvocation(SinkParams parameters,
+                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
+
+    var extractor = parameters.extractor();
+    this.dataFormatDefinition = new JsonDataFormatDefinition();
+    String publisherTopic = extractor.singleValueParameter(TOPIC_KEY, 
String.class);
+
+    String rabbitMqHost = extractor.singleValueParameter(HOST_KEY, 
String.class);
+    Integer rabbitMqPort = extractor.singleValueParameter(PORT_KEY, 
Integer.class);
+    String rabbitMqUser = extractor.singleValueParameter(USER_KEY, 
String.class);
+    String rabbitMqPassword = extractor.secretValue(PASSWORD_KEY);
+    String exchangeName = extractor.singleValueParameter(EXCHANGE_NAME_KEY, 
String.class);
+
+    var rabbitMqParameters = new RabbitMqParameters(rabbitMqHost, 
rabbitMqPort, publisherTopic,
+        rabbitMqUser, rabbitMqPassword, exchangeName);
+
+    this.publisher = new RabbitMqPublisher(rabbitMqParameters);
+    this.topic = rabbitMqParameters.getRabbitMqTopic();
+
+    if (!this.publisher.isConnected()) {
+      throw new SpRuntimeException("Could not establish conntection to 
RabbitMQ broker. Host: "
+          + rabbitMqParameters.getRabbitMqHost() + " Port: " + 
rabbitMqParameters.getRabbitMqPort());
+    }
+  }
+
+  @Override
+  public void onEvent(Event inputEvent) throws SpRuntimeException {
+    try {
+      publisher.fire(dataFormatDefinition.fromMap(inputEvent.getRaw()),
+          PlaceholderExtractor.replacePlaceholders(inputEvent, topic));
+    } catch (SpRuntimeException e) {
+      LOG.error("Could not serialiaze event");
+    }
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+    publisher.cleanup();
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestParameters.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestParameters.java
deleted file mode 100644
index 6dd2c14fd..000000000
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestParameters.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.rest;
-
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
-
-public class RestParameters extends EventSinkBindingParams {
-
-  private String url;
-
-  public RestParameters(DataSinkInvocation graph, String url) {
-    super(graph);
-    this.url = url;
-  }
-
-  public String getUrl() {
-    return url;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestPublisher.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestPublisher.java
deleted file mode 100644
index 73cf829b5..000000000
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestPublisher.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.rest;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
-
-import java.io.IOException;
-
-public class RestPublisher implements EventSink<RestParameters> {
-  private static Logger logger;
-
-  private String url;
-  private JsonDataFormatDefinition jsonDataFormatDefinition;
-
-  @Override
-  public void onInvocation(RestParameters params, EventSinkRuntimeContext 
runtimeContext) throws SpRuntimeException {
-    this.url = params.getUrl();
-    logger = params.getGraph().getLogger(RestPublisher.class);
-    jsonDataFormatDefinition = new JsonDataFormatDefinition();
-  }
-
-  @Override
-  public void onEvent(Event inputEvent) {
-
-    byte[] json = null;
-    try {
-      json = jsonDataFormatDefinition.fromMap(inputEvent.getRaw());
-    } catch (SpRuntimeException e) {
-      logger.error("Error while serializing event: " + 
inputEvent.getSourceInfo().getSourceId() + " Exception: "
-          + e);
-    }
-
-    try {
-      Request.Post(url)
-          .bodyByteArray(json, ContentType.APPLICATION_JSON)
-          .connectTimeout(1000)
-          .socketTimeout(100000)
-          .execute().returnContent().asString();
-    } catch (IOException e) {
-      logger.error("Error while sending data to endpoint: " + url + " 
Exception: " + e);
-    }
-  }
-
-  @Override
-  public void onDetach() throws SpRuntimeException {
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestController.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
similarity index 51%
rename from 
streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestController.java
rename to 
streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
index c6ccd48d5..583b04fca 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestController.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
@@ -19,23 +19,37 @@
 package org.apache.streampipes.sinks.brokers.jvm.rest;
 
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
-public class RestController extends 
StandaloneEventSinkDeclarer<RestParameters> {
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class RestSink extends StreamPipesDataSink {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RestSink.class);
 
   private static final String URL_KEY = "url-key";
 
+  private String url;
+  private JsonDataFormatDefinition jsonDataFormatDefinition;
+
   @Override
   public DataSinkDescription declareModel() {
     return 
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rest")
@@ -52,13 +66,35 @@ public class RestController extends 
StandaloneEventSinkDeclarer<RestParameters>
   }
 
   @Override
-  public ConfiguredEventSink<RestParameters> onInvocation(DataSinkInvocation 
graph,
-                                                          
DataSinkParameterExtractor extractor) {
+  public void onDetach() throws SpRuntimeException {
 
-    String url = extractor.singleValueParameter(URL_KEY, String.class);
+  }
 
-    RestParameters params = new RestParameters(graph, url);
+  @Override
+  public void onInvocation(SinkParams parameters,
+                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
+    jsonDataFormatDefinition = new JsonDataFormatDefinition();
+    url = parameters.extractor().singleValueParameter(URL_KEY, String.class);
+  }
+
+  @Override
+  public void onEvent(Event inputEvent) throws SpRuntimeException {
+    byte[] json = null;
+    try {
+      json = jsonDataFormatDefinition.fromMap(inputEvent.getRaw());
+    } catch (SpRuntimeException e) {
+      LOG.error("Error while serializing event: " + 
inputEvent.getSourceInfo().getSourceId() + " Exception: "
+          + e);
+    }
 
-    return new ConfiguredEventSink<>(params, RestPublisher::new);
+    try {
+      Request.Post(url)
+          .bodyByteArray(json, ContentType.APPLICATION_JSON)
+          .connectTimeout(1000)
+          .socketTimeout(100000)
+          .execute().returnContent().asString();
+    } catch (IOException e) {
+      LOG.error("Error while sending data to endpoint: " + url + " Exception: 
" + e);
+    }
   }
 }

Reply via email to