This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch
3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
by this push:
new a697015234 refactor(#3927): Update data sink implementations to use
new configuration interface
a697015234 is described below
commit a6970152343b49b34cd69b8a1219d35146f1f693
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Nov 14 08:30:40 2025 +0100
refactor(#3927): Update data sink implementations to use new configuration
interface
---
.../sinks/notifications/jvm/email/EmailSink.java | 25 ++++++++++++++++------
.../notifications/jvm/onesignal/OneSignalSink.java | 23 +++++++++++++++-----
.../jvm/slack/SlackNotificationSink.java | 23 +++++++++++++++-----
.../notifications/jvm/telegram/TelegramSink.java | 23 +++++++++++++++-----
4 files changed, 73 insertions(+), 21 deletions(-)
diff --git
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailSink.java
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailSink.java
index 7b9ff905ff..17850ab9fc 100644
---
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailSink.java
+++
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailSink.java
@@ -20,7 +20,9 @@ package org.apache.streampipes.sinks.notifications.jvm.email;
import org.apache.streampipes.client.api.IStreamPipesClient;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.mail.SpEmail;
@@ -28,10 +30,10 @@ import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.pe.shared.PlaceholderExtractor;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
import java.util.Collections;
@@ -47,6 +49,17 @@ public class EmailSink extends StreamPipesNotificationSink {
private IStreamPipesClient client;
+ @Override
+ public IDataSinkConfiguration declareConfig() {
+ var builder = declareModelWithoutSilentPeriod();
+ addSilentPeriodParameter(builder);
+
+ return DataSinkConfiguration.create(
+ EmailSink::new,
+ builder.build()
+ );
+ }
+
@Override
public DataSinkBuilder declareModelWithoutSilentPeriod() {
return DataSinkBuilder
@@ -64,11 +77,11 @@ public class EmailSink extends StreamPipesNotificationSink {
}
@Override
- public void onInvocation(
- SinkParams parameters,
+ public void onPipelineStarted(
+ IDataSinkParameters parameters,
EventSinkRuntimeContext runtimeContext
- ) throws SpRuntimeException {
- super.onInvocation(parameters, runtimeContext);
+ ) {
+ super.onPipelineStarted(parameters, runtimeContext);
var extractor = parameters.extractor();
String toEmail = extractor.singleValueParameter(TO_EMAIL_ADRESS,
String.class);
@@ -90,7 +103,7 @@ public class EmailSink extends StreamPipesNotificationSink {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/onesignal/OneSignalSink.java
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/onesignal/OneSignalSink.java
index 4dcc04431b..55863b941c 100644
---
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/onesignal/OneSignalSink.java
+++
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/onesignal/OneSignalSink.java
@@ -19,16 +19,18 @@
package org.apache.streampipes.sinks.notifications.jvm.onesignal;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
import org.apache.http.HttpEntity;
@@ -53,6 +55,17 @@ public class OneSignalSink extends
StreamPipesNotificationSink {
private String appId;
private String apiKey;
+ @Override
+ public IDataSinkConfiguration declareConfig() {
+ var builder = declareModelWithoutSilentPeriod();
+ addSilentPeriodParameter(builder);
+
+ return DataSinkConfiguration.create(
+ OneSignalSink::new,
+ builder.build()
+ );
+ }
+
@Override
public DataSinkBuilder declareModelWithoutSilentPeriod() {
return DataSinkBuilder
@@ -70,9 +83,9 @@ public class OneSignalSink extends
StreamPipesNotificationSink {
}
@Override
- public void onInvocation(SinkParams parameters,
- EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
- super.onInvocation(parameters, runtimeContext);
+ public void onPipelineStarted(IDataSinkParameters parameters,
+ EventSinkRuntimeContext runtimeContext) {
+ super.onPipelineStarted(parameters, runtimeContext);
var extractor = parameters.extractor();
content = extractor.singleValueParameter(CONTENT_KEY, String.class);
appId = extractor.singleValueParameter(APP_ID, String.class);
@@ -108,7 +121,7 @@ public class OneSignalSink extends
StreamPipesNotificationSink {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/slack/SlackNotificationSink.java
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/slack/SlackNotificationSink.java
index 612af135c7..d7fbc3bcad 100644
---
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/slack/SlackNotificationSink.java
+++
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/slack/SlackNotificationSink.java
@@ -19,17 +19,19 @@
package org.apache.streampipes.sinks.notifications.jvm.slack;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
import com.ullink.slack.simpleslackapi.SlackChannel;
@@ -55,6 +57,17 @@ public class SlackNotificationSink extends
StreamPipesNotificationSink {
private String userChannel;
private String originalMessage;
+ @Override
+ public IDataSinkConfiguration declareConfig() {
+ var builder = declareModelWithoutSilentPeriod();
+ addSilentPeriodParameter(builder);
+
+ return DataSinkConfiguration.create(
+ SlackNotificationSink::new,
+ builder.build()
+ );
+ }
+
@Override
public DataSinkBuilder declareModelWithoutSilentPeriod() {
return DataSinkBuilder
@@ -74,9 +87,9 @@ public class SlackNotificationSink extends
StreamPipesNotificationSink {
}
@Override
- public void onInvocation(SinkParams parameters,
- EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
- super.onInvocation(parameters, runtimeContext);
+ public void onPipelineStarted(IDataSinkParameters parameters,
+ EventSinkRuntimeContext runtimeContext) {
+ super.onPipelineStarted(parameters, runtimeContext);
var extractor = parameters.extractor();
userChannel = extractor.singleValueParameter(RECEIVER, String.class);
@@ -124,7 +137,7 @@ public class SlackNotificationSink extends
StreamPipesNotificationSink {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
try {
this.session.disconnect();
} catch (IOException e) {
diff --git
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramSink.java
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramSink.java
index 7856ccd832..7eca095982 100644
---
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramSink.java
+++
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramSink.java
@@ -19,16 +19,18 @@
package org.apache.streampipes.sinks.notifications.jvm.telegram;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
import okhttp3.OkHttpClient;
@@ -56,6 +58,17 @@ public class TelegramSink extends
StreamPipesNotificationSink {
private String channelOrChatId;
private String message;
+ @Override
+ public IDataSinkConfiguration declareConfig() {
+ var builder = declareModelWithoutSilentPeriod();
+ addSilentPeriodParameter(builder);
+
+ return DataSinkConfiguration.create(
+ TelegramSink::new,
+ builder.build()
+ );
+ }
+
@Override
public DataSinkBuilder declareModelWithoutSilentPeriod() {
return DataSinkBuilder
@@ -73,9 +86,9 @@ public class TelegramSink extends StreamPipesNotificationSink
{
}
@Override
- public void onInvocation(SinkParams parameters,
- EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
- super.onInvocation(parameters, runtimeContext);
+ public void onPipelineStarted(IDataSinkParameters parameters,
+ EventSinkRuntimeContext runtimeContext) {
+ super.onPipelineStarted(parameters, runtimeContext);
var extractor = parameters.extractor();
apiKey = extractor.secretValue(BOT_API_KEY);
channelOrChatId = extractor.singleValueParameter(CHANNEL_NAME_OR_CHAT_ID,
String.class);
@@ -103,7 +116,7 @@ public class TelegramSink extends
StreamPipesNotificationSink {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
// Do nothing
}