This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch
3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
by this push:
new 29eb5035ee refactor(#3927): Update data sink implementations to use
new configuration interface
29eb5035ee is described below
commit 29eb5035ee5d27e831112a3016b148d90cdb0b25
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Nov 14 07:38:17 2025 +0100
refactor(#3927): Update data sink implementations to use new configuration
interface
---
.../connectors/influx/sink/InfluxDbSink.java | 56 ++++++++++++----------
.../jvm/bufferrest/BufferRestPublisherSink.java | 48 ++++++++++---------
.../sinks/brokers/jvm/jms/JmsPublisherSink.java | 47 ++++++++++--------
.../jvm/rabbitmq/RabbitMqPublisherSink.java | 51 +++++++++++---------
4 files changed, 110 insertions(+), 92 deletions(-)
diff --git
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
index 3da9bc629c..6cca84df54 100644
---
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
+++
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
@@ -19,20 +19,21 @@
package org.apache.streampipes.extensions.connectors.influx.sink;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import
org.apache.streampipes.extensions.connectors.influx.shared.InfluxConfigs;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,16 +43,38 @@ import static
org.apache.streampipes.extensions.connectors.influx.shared.InfluxK
import static
org.apache.streampipes.extensions.connectors.influx.shared.InfluxKeys.MAX_FLUSH_DURATION_KEY;
import static
org.apache.streampipes.extensions.connectors.influx.shared.InfluxKeys.TIMESTAMP_MAPPING_KEY;
-public class InfluxDbSink extends StreamPipesDataSink {
+public class InfluxDbSink implements IStreamPipesDataSink {
private static final Logger LOG =
LoggerFactory.getLogger(InfluxDbSink.class);
private InfluxDbClient influxDbClient;
+ @Override
+ public IDataSinkConfiguration declareConfig() {
+ var builder =
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.influxdb", 0)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
+ .category(DataSinkType.DATABASE);
+
+ InfluxConfigs.appendSharedInfluxConfig(builder);
+
+
builder.requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
+ EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_MAPPING_KEY),
+ PropertyScope.NONE).build())
+ .requiredIntegerParameter(Labels.withId(BATCH_INTERVAL_ACTIONS_KEY))
+ .requiredIntegerParameter(Labels.withId(MAX_FLUSH_DURATION_KEY), 2000);
+
+ return DataSinkConfiguration.create(
+ InfluxDbSink::new,
+ builder.build()
+ );
+ }
@Override
- public void onInvocation(SinkParams parameters,
- EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(
+ IDataSinkParameters parameters,
+ EventSinkRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
@@ -82,30 +105,11 @@ public class InfluxDbSink extends StreamPipesDataSink {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
influxDbClient.disconnect();
}
public static String prepareString(String s) {
return s.toLowerCase().replaceAll("[^a-zA-Z0-9]", "");
}
-
- @Override
- public DataSinkDescription declareModel() {
- var builder =
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.influxdb", 0)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .category(DataSinkType.DATABASE);
-
- InfluxConfigs.appendSharedInfluxConfig(builder);
-
-
builder.requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
- EpRequirements.timestampReq(),
- Labels.withId(TIMESTAMP_MAPPING_KEY),
- PropertyScope.NONE).build())
- .requiredIntegerParameter(Labels.withId(BATCH_INTERVAL_ACTIONS_KEY))
- .requiredIntegerParameter(Labels.withId(MAX_FLUSH_DURATION_KEY), 2000);
-
- return builder.build();
- }
}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
index 95b794e5aa..2eeb90db98 100644
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.java
@@ -21,20 +21,21 @@ package org.apache.streampipes.sinks.brokers.jvm.bufferrest;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import
org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.BufferListener;
import
org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.MessageBuffer;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import org.apache.commons.io.Charsets;
import org.apache.http.client.fluent.Request;
@@ -46,7 +47,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
-public class BufferRestPublisherSink extends StreamPipesDataSink implements
BufferListener {
+public class BufferRestPublisherSink implements IStreamPipesDataSink,
BufferListener {
private static final Logger LOG =
LoggerFactory.getLogger(BufferRestPublisherSink.class);
@@ -61,26 +62,29 @@ public class BufferRestPublisherSink extends
StreamPipesDataSink implements Buff
private MessageBuffer buffer;
@Override
- public DataSinkDescription declareModel() {
- return DataSinkBuilder
- .create("org.apache.streampipes.sinks.brokers.jvm.bufferrest", 0)
- .category(DataSinkType.NOTIFICATION)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithNaryMapping(EpRequirements.anyProperty(),
Labels.withId(
- KEY + FIELDS), PropertyScope.NONE)
- .build())
- .requiredTextParameter(Labels.from(KEY + URI, "REST Endpoint URI",
"REST Endpoint URI"))
- .requiredIntegerParameter(Labels.from(KEY + COUNT, "Buffered Event
Count",
- "Number (1 <= x <= 1000000) of incoming events before sending
data on to the given REST endpoint"),
- 1, 1000000, 1)
- .build();
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ BufferRestPublisherSink::new,
+ DataSinkBuilder
+ .create("org.apache.streampipes.sinks.brokers.jvm.bufferrest", 0)
+ .category(DataSinkType.NOTIFICATION)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithNaryMapping(EpRequirements.anyProperty(),
Labels.withId(
+ KEY + FIELDS), PropertyScope.NONE)
+ .build())
+ .requiredTextParameter(Labels.from(KEY + URI, "REST Endpoint URI",
"REST Endpoint URI"))
+ .requiredIntegerParameter(Labels.from(KEY + COUNT, "Buffered Event
Count",
+ "Number (1 <= x <= 1000000) of incoming events before
sending data on to the given REST endpoint"),
+ 1, 1000000, 1)
+ .build()
+ );
}
@Override
- public void onInvocation(SinkParams parameters,
- EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(IDataSinkParameters parameters,
+ EventSinkRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
fieldsToSend = extractor.mappingPropertyValues(KEY + FIELDS);
@@ -104,7 +108,7 @@ public class BufferRestPublisherSink extends
StreamPipesDataSink implements Buff
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
buffer.removeListener(this);
}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
index 3d2fa15cbb..ce8ff8a22c 100644
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisherSink.java
@@ -21,24 +21,25 @@ package org.apache.streampipes.sinks.brokers.jvm.jms;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import java.util.Map;
-public class JmsPublisherSink extends StreamPipesDataSink {
+public class JmsPublisherSink implements IStreamPipesDataSink {
private static final String TOPIC_KEY = "topic";
private static final String HOST_KEY = "host";
@@ -48,24 +49,28 @@ public class JmsPublisherSink extends StreamPipesDataSink {
private JsonDataFormatDefinition jsonDataFormatDefinition;
@Override
- public DataSinkDescription declareModel() {
- return
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.jms", 0)
- .category(DataSinkType.MESSAGING)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
- .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
- .requiredIntegerParameter(Labels.withId(PORT_KEY), 61616)
- .build();
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ JmsPublisherSink::new,
+ DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.jms",
0)
+ .category(DataSinkType.MESSAGING)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
+ .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+ .requiredIntegerParameter(Labels.withId(PORT_KEY), 61616)
+ .build()
+ );
}
@Override
- public void onInvocation(SinkParams parameters,
- EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(
+ IDataSinkParameters parameters,
+ EventSinkRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
this.jsonDataFormatDefinition = new JsonDataFormatDefinition();
@@ -93,12 +98,12 @@ public class JmsPublisherSink extends StreamPipesDataSink {
Map<String, Object> event = inputEvent.getRaw();
this.publisher.publish(jsonDataFormatDefinition.fromMap(event));
} catch (SpRuntimeException e) {
- e.printStackTrace();
+ throw e;
}
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
this.publisher.disconnect();
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
index 2e7de12842..6404005082 100644
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisherSink.java
@@ -20,24 +20,25 @@ package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.pe.shared.PlaceholderExtractor;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RabbitMqPublisherSink extends StreamPipesDataSink {
+public class RabbitMqPublisherSink implements IStreamPipesDataSink {
private static final Logger LOG =
LoggerFactory.getLogger(RabbitMqPublisherSink.class);
private static final String TOPIC_KEY = "topic";
@@ -54,27 +55,31 @@ public class RabbitMqPublisherSink extends
StreamPipesDataSink {
@Override
- public DataSinkDescription declareModel() {
- return
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rabbitmq", 0)
- .category(DataSinkType.MESSAGING)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
- .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
- .requiredIntegerParameter(Labels.withId(PORT_KEY), 5672)
- .requiredTextParameter(Labels.withId(USER_KEY), false, false)
- .requiredTextParameter(Labels.withId(EXCHANGE_NAME_KEY), false, false)
- .requiredSecret(Labels.withId(PASSWORD_KEY))
- .build();
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ RabbitMqPublisherSink::new,
+
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rabbitmq", 0)
+ .category(DataSinkType.MESSAGING)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
+ .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+ .requiredIntegerParameter(Labels.withId(PORT_KEY), 5672)
+ .requiredTextParameter(Labels.withId(USER_KEY), false, false)
+ .requiredTextParameter(Labels.withId(EXCHANGE_NAME_KEY), false,
false)
+ .requiredSecret(Labels.withId(PASSWORD_KEY))
+ .build()
+ );
}
@Override
- public void onInvocation(SinkParams parameters,
- EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(
+ IDataSinkParameters parameters,
+ EventSinkRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
this.dataFormatDefinition = new JsonDataFormatDefinition();
@@ -109,7 +114,7 @@ public class RabbitMqPublisherSink extends
StreamPipesDataSink {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
publisher.cleanup();
}
}