This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
 by this push:
     new 29eb5035ee refactor(#3927): Update data sink implementations to use 
new configuration interface
29eb5035ee is described below

commit 29eb5035ee5d27e831112a3016b148d90cdb0b25
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Nov 14 07:38:17 2025 +0100

    refactor(#3927): Update data sink implementations to use new configuration 
interface
---
 .../connectors/influx/sink/InfluxDbSink.java       | 56 ++++++++++++----------
 .../jvm/bufferrest/BufferRestPublisherSink.java    | 48 ++++++++++---------
 .../sinks/brokers/jvm/jms/JmsPublisherSink.java    | 47 ++++++++++--------
 .../jvm/rabbitmq/RabbitMqPublisherSink.java        | 51 +++++++++++---------
 4 files changed, 110 insertions(+), 92 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
 
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
index 3da9bc629c..6cca84df54 100644
--- 
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
+++ 
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
@@ -19,20 +19,21 @@
 package org.apache.streampipes.extensions.connectors.influx.sink;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import 
org.apache.streampipes.extensions.connectors.influx.shared.InfluxConfigs;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,16 +43,38 @@ import static 
org.apache.streampipes.extensions.connectors.influx.shared.InfluxK
 import static 
org.apache.streampipes.extensions.connectors.influx.shared.InfluxKeys.MAX_FLUSH_DURATION_KEY;
 import static 
org.apache.streampipes.extensions.connectors.influx.shared.InfluxKeys.TIMESTAMP_MAPPING_KEY;
 
-public class InfluxDbSink extends StreamPipesDataSink {
+public class InfluxDbSink implements IStreamPipesDataSink {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(InfluxDbSink.class);
 
   private InfluxDbClient influxDbClient;
 
+  @Override
+  public IDataSinkConfiguration declareConfig() {
+    var builder = 
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.influxdb", 0)
+        .withLocales(Locales.EN)
+        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
+        .category(DataSinkType.DATABASE);
+
+    InfluxConfigs.appendSharedInfluxConfig(builder);
+
+    
builder.requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
+            EpRequirements.timestampReq(),
+            Labels.withId(TIMESTAMP_MAPPING_KEY),
+            PropertyScope.NONE).build())
+        .requiredIntegerParameter(Labels.withId(BATCH_INTERVAL_ACTIONS_KEY))
+        .requiredIntegerParameter(Labels.withId(MAX_FLUSH_DURATION_KEY), 2000);
+
+    return DataSinkConfiguration.create(
+        InfluxDbSink::new,
+        builder.build()
+    );
+  }
 
   @Override
-  public void onInvocation(SinkParams parameters,
-                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(
+      IDataSinkParameters parameters,
+      EventSinkRuntimeContext runtimeContext) {
 
     var extractor = parameters.extractor();
 
@@ -82,30 +105,11 @@ public class InfluxDbSink extends StreamPipesDataSink {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     influxDbClient.disconnect();
   }
 
   public static String prepareString(String s) {
     return s.toLowerCase().replaceAll("[^a-zA-Z0-9]", "");
   }
-
-  @Override
-  public DataSinkDescription declareModel() {
-    var builder = 
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.influxdb", 0)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .category(DataSinkType.DATABASE);
-
-    InfluxConfigs.appendSharedInfluxConfig(builder);
-
-    
builder.requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
-            EpRequirements.timestampReq(),
-            Labels.withId(TIMESTAMP_MAPPING_KEY),
-            PropertyScope.NONE).build())
-        .requiredIntegerParameter(Labels.withId(BATCH_INTERVAL_ACTIONS_KEY))
-        .requiredIntegerParameter(Labels.withId(MAX_FLUSH_DURATION_KEY), 2000);
-
-    return builder.build();
-  }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
index 95b794e5aa..2eeb90db98 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
@@ -21,20 +21,21 @@ package org.apache.streampipes.sinks.brokers.jvm.bufferrest;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import 
org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.BufferListener;
 import 
org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.MessageBuffer;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
 import org.apache.commons.io.Charsets;
 import org.apache.http.client.fluent.Request;
@@ -46,7 +47,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
-public class BufferRestPublisherSink extends StreamPipesDataSink implements 
BufferListener {
+public class BufferRestPublisherSink implements IStreamPipesDataSink, 
BufferListener {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BufferRestPublisherSink.class);
 
@@ -61,26 +62,29 @@ public class BufferRestPublisherSink extends 
StreamPipesDataSink implements Buff
   private MessageBuffer buffer;
 
   @Override
-  public DataSinkDescription declareModel() {
-    return DataSinkBuilder
-        .create("org.apache.streampipes.sinks.brokers.jvm.bufferrest", 0)
-        .category(DataSinkType.NOTIFICATION)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithNaryMapping(EpRequirements.anyProperty(), 
Labels.withId(
-                KEY + FIELDS), PropertyScope.NONE)
-            .build())
-        .requiredTextParameter(Labels.from(KEY + URI, "REST Endpoint URI", 
"REST Endpoint URI"))
-        .requiredIntegerParameter(Labels.from(KEY + COUNT, "Buffered Event 
Count",
-                "Number (1 <= x <= 1000000) of incoming events before sending 
data on to the given REST endpoint"),
-            1, 1000000, 1)
-        .build();
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(
+        BufferRestPublisherSink::new,
+        DataSinkBuilder
+            .create("org.apache.streampipes.sinks.brokers.jvm.bufferrest", 0)
+            .category(DataSinkType.NOTIFICATION)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredPropertyWithNaryMapping(EpRequirements.anyProperty(), 
Labels.withId(
+                    KEY + FIELDS), PropertyScope.NONE)
+                .build())
+            .requiredTextParameter(Labels.from(KEY + URI, "REST Endpoint URI", 
"REST Endpoint URI"))
+            .requiredIntegerParameter(Labels.from(KEY + COUNT, "Buffered Event 
Count",
+                    "Number (1 <= x <= 1000000) of incoming events before 
sending data on to the given REST endpoint"),
+                1, 1000000, 1)
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(SinkParams parameters,
-                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(IDataSinkParameters parameters,
+                                EventSinkRuntimeContext runtimeContext) {
 
     var extractor = parameters.extractor();
     fieldsToSend = extractor.mappingPropertyValues(KEY + FIELDS);
@@ -104,7 +108,7 @@ public class BufferRestPublisherSink extends 
StreamPipesDataSink implements Buff
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     buffer.removeListener(this);
   }
 
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
index 3d2fa15cbb..ce8ff8a22c 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
@@ -21,24 +21,25 @@ package org.apache.streampipes.sinks.brokers.jvm.jms;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
 import java.util.Map;
 
-public class JmsPublisherSink extends StreamPipesDataSink {
+public class JmsPublisherSink implements IStreamPipesDataSink {
 
   private static final String TOPIC_KEY = "topic";
   private static final String HOST_KEY = "host";
@@ -48,24 +49,28 @@ public class JmsPublisherSink extends StreamPipesDataSink {
   private JsonDataFormatDefinition jsonDataFormatDefinition;
 
   @Override
-  public DataSinkDescription declareModel() {
-    return 
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.jms", 0)
-        .category(DataSinkType.MESSAGING)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredProperty(EpRequirements.anyProperty())
-            .build())
-        .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
-        .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
-        .requiredIntegerParameter(Labels.withId(PORT_KEY), 61616)
-        .build();
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(
+        JmsPublisherSink::new,
+        DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.jms", 
0)
+            .category(DataSinkType.MESSAGING)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredProperty(EpRequirements.anyProperty())
+                .build())
+            .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
+            .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+            .requiredIntegerParameter(Labels.withId(PORT_KEY), 61616)
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(SinkParams parameters,
-                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(
+      IDataSinkParameters parameters,
+      EventSinkRuntimeContext runtimeContext) {
 
     var extractor = parameters.extractor();
     this.jsonDataFormatDefinition = new JsonDataFormatDefinition();
@@ -93,12 +98,12 @@ public class JmsPublisherSink extends StreamPipesDataSink {
       Map<String, Object> event = inputEvent.getRaw();
       this.publisher.publish(jsonDataFormatDefinition.fromMap(event));
     } catch (SpRuntimeException e) {
-      e.printStackTrace();
+      throw e;
     }
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     this.publisher.disconnect();
   }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
index 2e7de12842..6404005082 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
@@ -20,24 +20,25 @@ package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.pe.shared.PlaceholderExtractor;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RabbitMqPublisherSink extends StreamPipesDataSink {
+public class RabbitMqPublisherSink implements IStreamPipesDataSink {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMqPublisherSink.class);
   private static final String TOPIC_KEY = "topic";
@@ -54,27 +55,31 @@ public class RabbitMqPublisherSink extends 
StreamPipesDataSink {
 
 
   @Override
-  public DataSinkDescription declareModel() {
-    return 
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rabbitmq", 0)
-        .category(DataSinkType.MESSAGING)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredProperty(EpRequirements.anyProperty())
-            .build())
-        .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
-        .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
-        .requiredIntegerParameter(Labels.withId(PORT_KEY), 5672)
-        .requiredTextParameter(Labels.withId(USER_KEY), false, false)
-        .requiredTextParameter(Labels.withId(EXCHANGE_NAME_KEY), false, false)
-        .requiredSecret(Labels.withId(PASSWORD_KEY))
-        .build();
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(
+        RabbitMqPublisherSink::new,
+        
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rabbitmq", 0)
+            .category(DataSinkType.MESSAGING)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredProperty(EpRequirements.anyProperty())
+                .build())
+            .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
+            .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+            .requiredIntegerParameter(Labels.withId(PORT_KEY), 5672)
+            .requiredTextParameter(Labels.withId(USER_KEY), false, false)
+            .requiredTextParameter(Labels.withId(EXCHANGE_NAME_KEY), false, 
false)
+            .requiredSecret(Labels.withId(PASSWORD_KEY))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(SinkParams parameters,
-                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(
+      IDataSinkParameters parameters,
+      EventSinkRuntimeContext runtimeContext) {
 
     var extractor = parameters.extractor();
     this.dataFormatDefinition = new JsonDataFormatDefinition();
@@ -109,7 +114,7 @@ public class RabbitMqPublisherSink extends 
StreamPipesDataSink {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     publisher.cleanup();
   }
 }

Reply via email to