This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
 by this push:
     new 239a5ee48e refactor(#3927): Update data sink implementations to use 
new configuration interface
239a5ee48e is described below

commit 239a5ee48e2aace8924d45c56fb9e1245393bb4a
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Nov 14 08:18:15 2025 +0100

    refactor(#3927): Update data sink implementations to use new configuration 
interface
---
 .../databases/jvm/postgresql/PostgreSqlSink.java   | 59 +++++++++++---------
 .../sinks/databases/jvm/redis/RedisSink.java       | 64 ++++++++++++----------
 .../sinks/databases/jvm/tsfile/TsFileSink.java     | 54 +++++++++---------
 .../sinks/internal/jvm/datalake/DataLakeSink.java  | 62 +++++++++++----------
 .../InternalStreamPipesNotificationSink.java       | 23 ++++++--
 .../standalone/StreamPipesNotificationSink.java    | 50 ++++++++---------
 6 files changed, 169 insertions(+), 143 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlSink.java
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlSink.java
index 9643d18426..3c321640cb 100644
--- 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlSink.java
@@ -19,22 +19,23 @@
 package org.apache.streampipes.sinks.databases.jvm.postgresql;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.Tuple2;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
-public class PostgreSqlSink extends StreamPipesDataSink {
+public class PostgreSqlSink implements IStreamPipesDataSink {
 
   private static final String DATABASE_HOST_KEY = "db_host";
   private static final String DATABASE_PORT_KEY = "db_port";
@@ -49,31 +50,35 @@ public class PostgreSqlSink extends StreamPipesDataSink {
   private PostgreSql postgreSql;
 
   @Override
-  public DataSinkDescription declareModel() {
-    return DataSinkBuilder
-        .create("org.apache.streampipes.sinks.databases.jvm.postgresql", 0)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .category(DataSinkType.DATABASE)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredProperty(EpRequirements.anyProperty())
-            .build())
-        .requiredTextParameter(Labels.withId(DATABASE_HOST_KEY))
-        .requiredIntegerParameter(Labels.withId(DATABASE_PORT_KEY), 5432)
-        .requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
-        .requiredTextParameter(Labels.withId(DATABASE_TABLE_KEY))
-        .requiredTextParameter(Labels.withId(DATABASE_USER_KEY))
-        .requiredSecret(Labels.withId(DATABASE_PASSWORD_KEY))
-        .requiredSingleValueSelection(Labels.withId(SSL_MODE),
-            Options.from(
-                new Tuple2<>("Yes", SSL_ENABLED),
-                new Tuple2<>("No", SSL_DISABLED)))
-        .build();
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(
+        PostgreSqlSink::new,
+        DataSinkBuilder
+            .create("org.apache.streampipes.sinks.databases.jvm.postgresql", 0)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .category(DataSinkType.DATABASE)
+            .requiredStream(StreamRequirementsBuilder.create()
+                .requiredProperty(EpRequirements.anyProperty())
+                .build())
+            .requiredTextParameter(Labels.withId(DATABASE_HOST_KEY))
+            .requiredIntegerParameter(Labels.withId(DATABASE_PORT_KEY), 5432)
+            .requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
+            .requiredTextParameter(Labels.withId(DATABASE_TABLE_KEY))
+            .requiredTextParameter(Labels.withId(DATABASE_USER_KEY))
+            .requiredSecret(Labels.withId(DATABASE_PASSWORD_KEY))
+            .requiredSingleValueSelection(Labels.withId(SSL_MODE),
+                Options.from(
+                    new Tuple2<>("Yes", SSL_ENABLED),
+                    new Tuple2<>("No", SSL_DISABLED)))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(SinkParams parameters,
-                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(
+      IDataSinkParameters parameters,
+      EventSinkRuntimeContext runtimeContext) {
     var extractor = parameters.extractor();
     String hostname = extractor.singleValueParameter(DATABASE_HOST_KEY, 
String.class);
     Integer port = extractor.singleValueParameter(DATABASE_PORT_KEY, 
Integer.class);
@@ -103,7 +108,7 @@ public class PostgreSqlSink extends StreamPipesDataSink {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     postgreSql.onDetach();
   }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/redis/RedisSink.java
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/redis/RedisSink.java
index b8c0282389..b0d6ca809a 100644
--- 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/redis/RedisSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/redis/RedisSink.java
@@ -19,22 +19,23 @@
 package org.apache.streampipes.sinks.databases.jvm.redis;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
-public class RedisSink extends StreamPipesDataSink {
+public class RedisSink implements IStreamPipesDataSink {
 
   private static final String EVENT_PRIMARY_KEY = "event_pk";
 
@@ -63,34 +64,37 @@ public class RedisSink extends StreamPipesDataSink {
   private Redis redis;
 
   @Override
-  public DataSinkDescription declareModel() {
-    return DataSinkBuilder
-        .create("org.apache.streampipes.sinks.databases.jvm.redis", 0)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .category(DataSinkType.DATABASE)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(),
-                Labels.withId(EVENT_PRIMARY_KEY),
-                PropertyScope.NONE).build())
-        .requiredSingleValueSelection(Labels.withId(EVENT_KEY_AUTO_INCREMENT), 
Options.from("False", "True"))
-        .requiredTextParameter(Labels.withId(REDIS_HOST_KEY))
-        .requiredIntegerParameter(Labels.withId(REDIS_PORT_KEY), 6379)
-        .requiredIntegerParameter(Labels.withId(EVENT_TTL_KEY), -1)
-        // TODO: Use this after optional parameters implementation
-        //  .requiredSecret(Labels.withId(REDIS_PASSWORD_KEY))
-        //  .requiredTextParameter(Labels.withId(REDIS_CLIENT_KEY))
-        .requiredIntegerParameter(Labels.withId(REDIS_INDEX_KEY), -1)
-        .requiredIntegerParameter(Labels.withId(REDIS_POOL_MAX_ACTIVE_KEY), 8)
-        .requiredIntegerParameter(Labels.withId(REDIS_POOL_MAX_IDLE_KEY), 8)
-        .requiredIntegerParameter(Labels.withId(REDIS_POOL_MAX_WAIT_KEY), -1)
-        .requiredIntegerParameter(Labels.withId(REDIS_POOL_TIMEOUT_KEY), 2000)
-        .build();
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(
+        RedisSink::new,
+        DataSinkBuilder
+            .create("org.apache.streampipes.sinks.databases.jvm.redis", 0)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .category(DataSinkType.DATABASE)
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(),
+                    Labels.withId(EVENT_PRIMARY_KEY),
+                    PropertyScope.NONE).build())
+            
.requiredSingleValueSelection(Labels.withId(EVENT_KEY_AUTO_INCREMENT), 
Options.from("False", "True"))
+            .requiredTextParameter(Labels.withId(REDIS_HOST_KEY))
+            .requiredIntegerParameter(Labels.withId(REDIS_PORT_KEY), 6379)
+            .requiredIntegerParameter(Labels.withId(EVENT_TTL_KEY), -1)
+            // TODO: Use this after optional parameters implementation
+            //  .requiredSecret(Labels.withId(REDIS_PASSWORD_KEY))
+            //  .requiredTextParameter(Labels.withId(REDIS_CLIENT_KEY))
+            .requiredIntegerParameter(Labels.withId(REDIS_INDEX_KEY), -1)
+            
.requiredIntegerParameter(Labels.withId(REDIS_POOL_MAX_ACTIVE_KEY), 8)
+            .requiredIntegerParameter(Labels.withId(REDIS_POOL_MAX_IDLE_KEY), 
8)
+            .requiredIntegerParameter(Labels.withId(REDIS_POOL_MAX_WAIT_KEY), 
-1)
+            .requiredIntegerParameter(Labels.withId(REDIS_POOL_TIMEOUT_KEY), 
2000)
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(SinkParams parameters, EventSinkRuntimeContext 
runtimeContext) throws SpRuntimeException {
+  public void onPipelineStarted(IDataSinkParameters parameters, 
EventSinkRuntimeContext runtimeContext) {
     var extractor = parameters.extractor();
     String redisHost = extractor.singleValueParameter(REDIS_HOST_KEY, 
String.class);
     Integer redisPort = extractor.singleValueParameter(REDIS_PORT_KEY, 
Integer.class);
@@ -123,7 +127,7 @@ public class RedisSink extends StreamPipesDataSink {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     redis.onDetach();
   }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/tsfile/TsFileSink.java
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/tsfile/TsFileSink.java
index 0bdf442cca..7b135b56b6 100644
--- 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/tsfile/TsFileSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/tsfile/TsFileSink.java
@@ -19,10 +19,12 @@
 package org.apache.streampipes.sinks.databases.jvm.tsfile;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.field.AbstractField;
 import org.apache.streampipes.model.schema.EventProperty;
@@ -31,13 +33,12 @@ import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.vocabulary.XSD;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.write.WriteProcessException;
@@ -55,7 +56,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-public class TsFileSink extends StreamPipesDataSink {
+public class TsFileSink implements IStreamPipesDataSink {
 
   private static final Logger log = LoggerFactory.getLogger(TsFileSink.class);
   public static final String DEVICE_ID_KEY = "device_id";
@@ -104,29 +105,32 @@ public class TsFileSink extends StreamPipesDataSink {
   private boolean aligned = false;
 
   @Override
-  public DataSinkDescription declareModel() {
-    return DataSinkBuilder
-        .create("org.apache.streampipes.sinks.databases.jvm.tsfile", 0)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON).
-        category(DataSinkType.DATABASE)
-        .requiredTextParameter(Labels.withId(TSFILE_NAME_KEY))
-        .requiredTextParameter(Labels.withId(DEVICE_ID_KEY))
-        .requiredTextParameter(Labels.withId(TSFILE_GENERATION_DIRECTORY_KRY))
-        .requiredLongParameter(Labels.withId(MAX_TSFILE_SIZE_KEY), 1024L * 
1024 * 10)
-        .requiredLongParameter(Labels.withId(MAX_FLUSH_DISK_SIZE_KEY), 
Long.MAX_VALUE)
-        .requiredSingleValueSelection(Labels.withId(ALIGNED), 
Options.from("False", "True"))
-        .requiredStream(
-                StreamRequirementsBuilder.create()
-                        .requiredPropertyWithUnaryMapping
-                                (EpRequirements.timestampReq(), 
Labels.withId(TIMESTAMP_MAPPING_KEY),
-                                PropertyScope.NONE).build()
-        )
-        .build();
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(
+        TsFileSink::new,
+        DataSinkBuilder
+            .create("org.apache.streampipes.sinks.databases.jvm.tsfile", 0)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON).
+            category(DataSinkType.DATABASE)
+            .requiredTextParameter(Labels.withId(TSFILE_NAME_KEY))
+            .requiredTextParameter(Labels.withId(DEVICE_ID_KEY))
+            
.requiredTextParameter(Labels.withId(TSFILE_GENERATION_DIRECTORY_KRY))
+            .requiredLongParameter(Labels.withId(MAX_TSFILE_SIZE_KEY), 1024L * 
1024 * 10)
+            .requiredLongParameter(Labels.withId(MAX_FLUSH_DISK_SIZE_KEY), 
Long.MAX_VALUE)
+            .requiredSingleValueSelection(Labels.withId(ALIGNED), 
Options.from("False", "True"))
+            .requiredStream(
+                    StreamRequirementsBuilder.create()
+                            .requiredPropertyWithUnaryMapping
+                                    (EpRequirements.timestampReq(), 
Labels.withId(TIMESTAMP_MAPPING_KEY),
+                                    PropertyScope.NONE).build()
+            )
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(SinkParams parameters, EventSinkRuntimeContext 
runtimeContext) throws SpRuntimeException {
+  public void onPipelineStarted(IDataSinkParameters parameters, 
EventSinkRuntimeContext runtimeContext) {
     this.tsFileName = 
parameters.extractor().singleValueParameter(TSFILE_NAME_KEY, String.class);
     this.deviceId = parameters.extractor().singleValueParameter(DEVICE_ID_KEY, 
String.class);
     this.dirAbsolutePath = 
parameters.extractor().singleValueParameter(TSFILE_GENERATION_DIRECTORY_KRY, 
String.class);
@@ -148,7 +152,7 @@ public class TsFileSink extends StreamPipesDataSink {
   }
 
   @Override
-  public void onDetach() {
+  public void onPipelineStopped() {
     try {
       if (tsFileWriter != null){
         tsFileWriter.close();
diff --git 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index 1fbb400dd8..eb4565271d 100644
--- 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -25,14 +25,16 @@ import org.apache.streampipes.dataexplorer.TimeSeriesStore;
 import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
 import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
 import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import 
org.apache.streampipes.model.datalake.DataLakeMeasureSchemaUpdateStrategy;
 import org.apache.streampipes.model.datalake.RetentionTimeConfig;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.schema.PropertyScope;
@@ -40,19 +42,18 @@ import 
org.apache.streampipes.model.staticproperty.RuntimeResolvableAnyStaticPro
 import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
-public class DataLakeSink extends StreamPipesDataSink implements 
SupportsRuntimeConfig {
+public class DataLakeSink implements IStreamPipesDataSink, 
SupportsRuntimeConfig {
 
   private static final String DATABASE_MEASUREMENT_KEY = "db_measurement";
   private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping";
@@ -69,33 +70,36 @@ public class DataLakeSink extends StreamPipesDataSink 
implements SupportsRuntime
 
 
   @Override
-  public DataSinkDescription declareModel() {
-    return DataSinkBuilder
-        .create("org.apache.streampipes.sinks.internal.jvm.datalake", 2)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .category(DataSinkType.INTERNAL)
-        .requiredStream(StreamRequirementsBuilder
-                            .create()
-                            .requiredPropertyWithUnaryMapping(
-                                EpRequirements.timestampReq(),
-                                Labels.withId(TIMESTAMP_MAPPING_KEY),
-                                PropertyScope.NONE
-                            )
-                            .build())
-        .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
-        .requiredSingleValueSelection(
-            Labels.withId(SCHEMA_UPDATE_KEY),
-            Options.from(SCHEMA_UPDATE_OPTION, EXTEND_EXISTING_SCHEMA_OPTION)
-        )
-        
.requiredMultiValueSelectionFromContainer(Labels.withId(DIMENSIONS_KEY))
-        .requiredSlideToggle(Labels.withId(IGNORE_DUPLICATES_KEY), false)
-        .build();
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(
+        DataLakeSink::new,
+        DataSinkBuilder
+            .create("org.apache.streampipes.sinks.internal.jvm.datalake", 2)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .category(DataSinkType.INTERNAL)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.timestampReq(),
+                                    Labels.withId(TIMESTAMP_MAPPING_KEY),
+                                    PropertyScope.NONE
+                                )
+                                .build())
+            .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
+            .requiredSingleValueSelection(
+                Labels.withId(SCHEMA_UPDATE_KEY),
+                Options.from(SCHEMA_UPDATE_OPTION, 
EXTEND_EXISTING_SCHEMA_OPTION)
+            )
+            
.requiredMultiValueSelectionFromContainer(Labels.withId(DIMENSIONS_KEY))
+            .requiredSlideToggle(Labels.withId(IGNORE_DUPLICATES_KEY), false)
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(SinkParams parameters, EventSinkRuntimeContext 
runtimeContext) throws SpRuntimeException {
-  
+  public void onPipelineStarted(IDataSinkParameters parameters, 
EventSinkRuntimeContext runtimeContext) {
+
     var extractor = parameters.extractor();
     var timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
     var measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, 
String.class);
@@ -142,7 +146,7 @@ public class DataLakeSink extends StreamPipesDataSink 
implements SupportsRuntime
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     this.timeSeriesStore.close();
   }
 
diff --git 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/InternalStreamPipesNotificationSink.java
 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/InternalStreamPipesNotificationSink.java
index 72a13a2316..ba946c7f92 100644
--- 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/InternalStreamPipesNotificationSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/InternalStreamPipesNotificationSink.java
@@ -20,8 +20,9 @@ package 
org.apache.streampipes.sinks.internal.jvm.notification;
 
 
 import org.apache.streampipes.client.api.IStreamPipesClient;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.Notification;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
@@ -29,10 +30,10 @@ import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.pe.shared.PlaceholderExtractor;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
 
 import java.util.Date;
@@ -52,9 +53,8 @@ public class InternalStreamPipesNotificationSink extends 
StreamPipesNotification
 
 
   @Override
-  public void onInvocation(SinkParams parameters, EventSinkRuntimeContext 
context) throws
-                                                                               
    SpRuntimeException {
-    super.onInvocation(parameters, context);
+  public void onPipelineStarted(IDataSinkParameters parameters, 
EventSinkRuntimeContext context) {
+    super.onPipelineStarted(parameters, context);
     this.title = parameters.extractor()
                            .singleValueParameter(TITLE_KEY, String.class);
     this.content = parameters.extractor()
@@ -88,10 +88,21 @@ public class InternalStreamPipesNotificationSink extends 
StreamPipesNotification
 
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
 
   }
 
+  @Override
+  public IDataSinkConfiguration declareConfig() {
+    var builder = declareModelWithoutSilentPeriod();
+    addSilentPeriodParameter(builder);
+
+    return DataSinkConfiguration.create(
+        InternalStreamPipesNotificationSink::new,
+        builder.build()
+    );
+  }
+
   @Override
   public DataSinkBuilder declareModelWithoutSilentPeriod() {
     return DataSinkBuilder
diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesNotificationSink.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesNotificationSink.java
index d4fc478e55..8954ae12bf 100644
--- 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesNotificationSink.java
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesNotificationSink.java
@@ -18,13 +18,12 @@
 
 package org.apache.streampipes.wrapper.standalone;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
-import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
 
 import java.time.Instant;
 
@@ -33,10 +32,10 @@ import java.time.Instant;
  * <p>
  * It provides some share functionalities for all notification sinks in 
StreamPipes.
  * Thereby, it slightly modifies the interfaces to implement for the actual 
data sink compared
- * to sinks directly inheriting from {@link StreamPipesDataSink}.
+ * to sinks directly implementing {@link IStreamPipesDataSink}.
  */
 
-public abstract class StreamPipesNotificationSink extends StreamPipesDataSink {
+public abstract class StreamPipesNotificationSink implements 
IStreamPipesDataSink {
 
   /**
    * Default waiting time in minutes between two consecutive notifications.
@@ -75,12 +74,12 @@ public abstract class StreamPipesNotificationSink extends 
StreamPipesDataSink {
   /**
    * This method is meant to be overridden by child classes.
    * It contains some important logic, and child classes are expected to call
-   * the super.onInvocation() to ensure proper behavior.
+   * the super.onPipelineStarted() to ensure proper behavior.
    */
-  public void onInvocation(
-      SinkParams parameters,
+  public void onPipelineStarted(
+      IDataSinkParameters parameters,
       EventSinkRuntimeContext runtimeContext
-  ) throws SpRuntimeException {
+  ) {
     // convert input given in minutes to seconds
     // this is later used to determine if a notification should be sent
     this.silentPeriodInSeconds = parameters.extractor()
@@ -106,12 +105,23 @@ public abstract class StreamPipesNotificationSink extends 
StreamPipesDataSink {
    */
   public abstract void onNotificationEvent(Event inputEvent);
 
-  @Override
-  public DataSinkDescription declareModel() {
-
-    var builder = declareModelWithoutSilentPeriod();
+  /**
+   * Abstract method to be implemented by subclasses for declaring the 
description of the data sink.
+   * Unlike {@link IStreamPipesDataSink#declareConfig()} it is expected to 
return a {@link DataSinkBuilder}
+   *
+   * @return The DataSinkBuilder representing the notification model.
+   */
+  public abstract DataSinkBuilder declareModelWithoutSilentPeriod();
 
-    builder.requiredIntegerParameter(
+  /**
+   * Helper method to add the silent period parameter to the data sink builder.
+   * This is typically called by subclasses when building their configuration.
+   *
+   * @param builder The DataSinkBuilder to add the silent period parameter to.
+   * @return The DataSinkBuilder with the silent period parameter added.
+   */
+  protected DataSinkBuilder addSilentPeriodParameter(DataSinkBuilder builder) {
+    return builder.requiredIntegerParameter(
         Labels.from(
             KEY_SILENT_PERIOD,
             "Silent Period [min]",
@@ -119,18 +129,6 @@ public abstract class StreamPipesNotificationSink extends 
StreamPipesDataSink {
         ),
         DEFAULT_WAITING_TIME_MINUTES
     );
-
-    return builder.build();
-
   }
 
-  /**
-   * Abstract method to be implemented by subclasses for declaring the 
description of the data sink.
-   * Unlike {@link StreamPipesDataSink#declareModel()} it is expected to 
return a {@link DataSinkBuilder}
-   *
-   * @return The DataSinkBuilder representing the notification model.
-   */
-
-  public abstract DataSinkBuilder declareModelWithoutSilentPeriod();
-
 }

Reply via email to