This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
 by this push:
     new a697015234 refactor(#3927): Update data sink implementations to use 
new configuration interface
a697015234 is described below

commit a6970152343b49b34cd69b8a1219d35146f1f693
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Nov 14 08:30:40 2025 +0100

    refactor(#3927): Update data sink implementations to use new configuration 
interface
---
 .../sinks/notifications/jvm/email/EmailSink.java   | 25 ++++++++++++++++------
 .../notifications/jvm/onesignal/OneSignalSink.java | 23 +++++++++++++++-----
 .../jvm/slack/SlackNotificationSink.java           | 23 +++++++++++++++-----
 .../notifications/jvm/telegram/TelegramSink.java   | 23 +++++++++++++++-----
 4 files changed, 73 insertions(+), 21 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailSink.java
 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailSink.java
index 7b9ff905ff..17850ab9fc 100644
--- 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailSink.java
@@ -20,7 +20,9 @@ package org.apache.streampipes.sinks.notifications.jvm.email;
 
 import org.apache.streampipes.client.api.IStreamPipesClient;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.model.mail.SpEmail;
@@ -28,10 +30,10 @@ import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.pe.shared.PlaceholderExtractor;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
 
 import java.util.Collections;
@@ -47,6 +49,17 @@ public class EmailSink extends StreamPipesNotificationSink {
 
   private IStreamPipesClient client;
 
+  @Override
+  public IDataSinkConfiguration declareConfig() {
+    var builder = declareModelWithoutSilentPeriod();
+    addSilentPeriodParameter(builder);
+
+    return DataSinkConfiguration.create(
+        EmailSink::new,
+        builder.build()
+    );
+  }
+
   @Override
   public DataSinkBuilder declareModelWithoutSilentPeriod() {
     return DataSinkBuilder
@@ -64,11 +77,11 @@ public class EmailSink extends StreamPipesNotificationSink {
   }
 
   @Override
-  public void onInvocation(
-      SinkParams parameters,
+  public void onPipelineStarted(
+      IDataSinkParameters parameters,
       EventSinkRuntimeContext runtimeContext
-  ) throws SpRuntimeException {
-    super.onInvocation(parameters, runtimeContext);
+  ) {
+    super.onPipelineStarted(parameters, runtimeContext);
 
     var extractor = parameters.extractor();
     String toEmail = extractor.singleValueParameter(TO_EMAIL_ADRESS, 
String.class);
@@ -90,7 +103,7 @@ public class EmailSink extends StreamPipesNotificationSink {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
 
   }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/onesignal/OneSignalSink.java
 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/onesignal/OneSignalSink.java
index 4dcc04431b..55863b941c 100644
--- 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/onesignal/OneSignalSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/onesignal/OneSignalSink.java
@@ -19,16 +19,18 @@
 package org.apache.streampipes.sinks.notifications.jvm.onesignal;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
 
 import org.apache.http.HttpEntity;
@@ -53,6 +55,17 @@ public class OneSignalSink extends 
StreamPipesNotificationSink {
   private String appId;
   private String apiKey;
 
+  @Override
+  public IDataSinkConfiguration declareConfig() {
+    var builder = declareModelWithoutSilentPeriod();
+    addSilentPeriodParameter(builder);
+
+    return DataSinkConfiguration.create(
+        OneSignalSink::new,
+        builder.build()
+    );
+  }
+
   @Override
   public DataSinkBuilder declareModelWithoutSilentPeriod() {
     return DataSinkBuilder
@@ -70,9 +83,9 @@ public class OneSignalSink extends 
StreamPipesNotificationSink {
   }
 
   @Override
-  public void onInvocation(SinkParams parameters,
-                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    super.onInvocation(parameters, runtimeContext);
+  public void onPipelineStarted(IDataSinkParameters parameters,
+                                EventSinkRuntimeContext runtimeContext) {
+    super.onPipelineStarted(parameters, runtimeContext);
     var extractor = parameters.extractor();
     content = extractor.singleValueParameter(CONTENT_KEY, String.class);
     appId = extractor.singleValueParameter(APP_ID, String.class);
@@ -108,7 +121,7 @@ public class OneSignalSink extends 
StreamPipesNotificationSink {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
 
   }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/slack/SlackNotificationSink.java
 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/slack/SlackNotificationSink.java
index 612af135c7..d7fbc3bcad 100644
--- 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/slack/SlackNotificationSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/slack/SlackNotificationSink.java
@@ -19,17 +19,19 @@
 package org.apache.streampipes.sinks.notifications.jvm.slack;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
 
 import com.ullink.slack.simpleslackapi.SlackChannel;
@@ -55,6 +57,17 @@ public class SlackNotificationSink extends 
StreamPipesNotificationSink {
   private String userChannel;
   private String originalMessage;
 
+  @Override
+  public IDataSinkConfiguration declareConfig() {
+    var builder = declareModelWithoutSilentPeriod();
+    addSilentPeriodParameter(builder);
+
+    return DataSinkConfiguration.create(
+        SlackNotificationSink::new,
+        builder.build()
+    );
+  }
+
   @Override
   public DataSinkBuilder declareModelWithoutSilentPeriod() {
     return DataSinkBuilder
@@ -74,9 +87,9 @@ public class SlackNotificationSink extends 
StreamPipesNotificationSink {
   }
 
   @Override
-  public void onInvocation(SinkParams parameters,
-                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    super.onInvocation(parameters, runtimeContext);
+  public void onPipelineStarted(IDataSinkParameters parameters,
+                                EventSinkRuntimeContext runtimeContext) {
+    super.onPipelineStarted(parameters, runtimeContext);
     var extractor = parameters.extractor();
 
     userChannel = extractor.singleValueParameter(RECEIVER, String.class);
@@ -124,7 +137,7 @@ public class SlackNotificationSink extends 
StreamPipesNotificationSink {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     try {
       this.session.disconnect();
     } catch (IOException e) {
diff --git 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramSink.java
 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramSink.java
index 7856ccd832..7eca095982 100644
--- 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramSink.java
@@ -19,16 +19,18 @@
 package org.apache.streampipes.sinks.notifications.jvm.telegram;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
 
 import okhttp3.OkHttpClient;
@@ -56,6 +58,17 @@ public class TelegramSink extends 
StreamPipesNotificationSink {
   private String channelOrChatId;
   private String message;
 
+  @Override
+  public IDataSinkConfiguration declareConfig() {
+    var builder = declareModelWithoutSilentPeriod();
+    addSilentPeriodParameter(builder);
+
+    return DataSinkConfiguration.create(
+        TelegramSink::new,
+        builder.build()
+    );
+  }
+
   @Override
   public DataSinkBuilder declareModelWithoutSilentPeriod() {
     return DataSinkBuilder
@@ -73,9 +86,9 @@ public class TelegramSink extends StreamPipesNotificationSink 
{
   }
 
   @Override
-  public void onInvocation(SinkParams parameters,
-                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    super.onInvocation(parameters, runtimeContext);
+  public void onPipelineStarted(IDataSinkParameters parameters,
+                                EventSinkRuntimeContext runtimeContext) {
+    super.onPipelineStarted(parameters, runtimeContext);
     var extractor = parameters.extractor();
     apiKey = extractor.secretValue(BOT_API_KEY);
     channelOrChatId = extractor.singleValueParameter(CHANNEL_NAME_OR_CHAT_ID, 
String.class);
@@ -103,7 +116,7 @@ public class TelegramSink extends 
StreamPipesNotificationSink {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     // Do nothing
   }
 

Reply via email to