This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch
3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
by this push:
new 239a5ee48e refactor(#3927): Update data sink implementations to use
new configuration interface
239a5ee48e is described below
commit 239a5ee48e2aace8924d45c56fb9e1245393bb4a
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Nov 14 08:18:15 2025 +0100
refactor(#3927): Update data sink implementations to use new configuration
interface
---
.../databases/jvm/postgresql/PostgreSqlSink.java | 59 +++++++++++---------
.../sinks/databases/jvm/redis/RedisSink.java | 64 ++++++++++++----------
.../sinks/databases/jvm/tsfile/TsFileSink.java | 54 +++++++++---------
.../sinks/internal/jvm/datalake/DataLakeSink.java | 62 +++++++++++----------
.../InternalStreamPipesNotificationSink.java | 23 ++++++--
.../standalone/StreamPipesNotificationSink.java | 50 ++++++++---------
6 files changed, 169 insertions(+), 143 deletions(-)
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlSink.java
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlSink.java
index 9643d18426..3c321640cb 100644
---
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlSink.java
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlSink.java
@@ -19,22 +19,23 @@
package org.apache.streampipes.sinks.databases.jvm.postgresql;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.Tuple2;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
-public class PostgreSqlSink extends StreamPipesDataSink {
+public class PostgreSqlSink implements IStreamPipesDataSink {
private static final String DATABASE_HOST_KEY = "db_host";
private static final String DATABASE_PORT_KEY = "db_port";
@@ -49,31 +50,35 @@ public class PostgreSqlSink extends StreamPipesDataSink {
private PostgreSql postgreSql;
@Override
- public DataSinkDescription declareModel() {
- return DataSinkBuilder
- .create("org.apache.streampipes.sinks.databases.jvm.postgresql", 0)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .category(DataSinkType.DATABASE)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .requiredTextParameter(Labels.withId(DATABASE_HOST_KEY))
- .requiredIntegerParameter(Labels.withId(DATABASE_PORT_KEY), 5432)
- .requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
- .requiredTextParameter(Labels.withId(DATABASE_TABLE_KEY))
- .requiredTextParameter(Labels.withId(DATABASE_USER_KEY))
- .requiredSecret(Labels.withId(DATABASE_PASSWORD_KEY))
- .requiredSingleValueSelection(Labels.withId(SSL_MODE),
- Options.from(
- new Tuple2<>("Yes", SSL_ENABLED),
- new Tuple2<>("No", SSL_DISABLED)))
- .build();
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ PostgreSqlSink::new,
+ DataSinkBuilder
+ .create("org.apache.streampipes.sinks.databases.jvm.postgresql", 0)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .category(DataSinkType.DATABASE)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .requiredTextParameter(Labels.withId(DATABASE_HOST_KEY))
+ .requiredIntegerParameter(Labels.withId(DATABASE_PORT_KEY), 5432)
+ .requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
+ .requiredTextParameter(Labels.withId(DATABASE_TABLE_KEY))
+ .requiredTextParameter(Labels.withId(DATABASE_USER_KEY))
+ .requiredSecret(Labels.withId(DATABASE_PASSWORD_KEY))
+ .requiredSingleValueSelection(Labels.withId(SSL_MODE),
+ Options.from(
+ new Tuple2<>("Yes", SSL_ENABLED),
+ new Tuple2<>("No", SSL_DISABLED)))
+ .build()
+ );
}
@Override
- public void onInvocation(SinkParams parameters,
- EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(
+ IDataSinkParameters parameters,
+ EventSinkRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
String hostname = extractor.singleValueParameter(DATABASE_HOST_KEY,
String.class);
Integer port = extractor.singleValueParameter(DATABASE_PORT_KEY,
Integer.class);
@@ -103,7 +108,7 @@ public class PostgreSqlSink extends StreamPipesDataSink {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
postgreSql.onDetach();
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/redis/RedisSink.java
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/redis/RedisSink.java
index b8c0282389..b0d6ca809a 100644
---
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/redis/RedisSink.java
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/redis/RedisSink.java
@@ -19,22 +19,23 @@
package org.apache.streampipes.sinks.databases.jvm.redis;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
-public class RedisSink extends StreamPipesDataSink {
+public class RedisSink implements IStreamPipesDataSink {
private static final String EVENT_PRIMARY_KEY = "event_pk";
@@ -63,34 +64,37 @@ public class RedisSink extends StreamPipesDataSink {
private Redis redis;
@Override
- public DataSinkDescription declareModel() {
- return DataSinkBuilder
- .create("org.apache.streampipes.sinks.databases.jvm.redis", 0)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .category(DataSinkType.DATABASE)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(),
- Labels.withId(EVENT_PRIMARY_KEY),
- PropertyScope.NONE).build())
- .requiredSingleValueSelection(Labels.withId(EVENT_KEY_AUTO_INCREMENT),
Options.from("False", "True"))
- .requiredTextParameter(Labels.withId(REDIS_HOST_KEY))
- .requiredIntegerParameter(Labels.withId(REDIS_PORT_KEY), 6379)
- .requiredIntegerParameter(Labels.withId(EVENT_TTL_KEY), -1)
- // TODO: Use this after optional parameters implementation
- // .requiredSecret(Labels.withId(REDIS_PASSWORD_KEY))
- // .requiredTextParameter(Labels.withId(REDIS_CLIENT_KEY))
- .requiredIntegerParameter(Labels.withId(REDIS_INDEX_KEY), -1)
- .requiredIntegerParameter(Labels.withId(REDIS_POOL_MAX_ACTIVE_KEY), 8)
- .requiredIntegerParameter(Labels.withId(REDIS_POOL_MAX_IDLE_KEY), 8)
- .requiredIntegerParameter(Labels.withId(REDIS_POOL_MAX_WAIT_KEY), -1)
- .requiredIntegerParameter(Labels.withId(REDIS_POOL_TIMEOUT_KEY), 2000)
- .build();
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ RedisSink::new,
+ DataSinkBuilder
+ .create("org.apache.streampipes.sinks.databases.jvm.redis", 0)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .category(DataSinkType.DATABASE)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(),
+ Labels.withId(EVENT_PRIMARY_KEY),
+ PropertyScope.NONE).build())
+
.requiredSingleValueSelection(Labels.withId(EVENT_KEY_AUTO_INCREMENT),
Options.from("False", "True"))
+ .requiredTextParameter(Labels.withId(REDIS_HOST_KEY))
+ .requiredIntegerParameter(Labels.withId(REDIS_PORT_KEY), 6379)
+ .requiredIntegerParameter(Labels.withId(EVENT_TTL_KEY), -1)
+ // TODO: Use this after optional parameters implementation
+ // .requiredSecret(Labels.withId(REDIS_PASSWORD_KEY))
+ // .requiredTextParameter(Labels.withId(REDIS_CLIENT_KEY))
+ .requiredIntegerParameter(Labels.withId(REDIS_INDEX_KEY), -1)
+
.requiredIntegerParameter(Labels.withId(REDIS_POOL_MAX_ACTIVE_KEY), 8)
+ .requiredIntegerParameter(Labels.withId(REDIS_POOL_MAX_IDLE_KEY),
8)
+ .requiredIntegerParameter(Labels.withId(REDIS_POOL_MAX_WAIT_KEY),
-1)
+ .requiredIntegerParameter(Labels.withId(REDIS_POOL_TIMEOUT_KEY),
2000)
+ .build()
+ );
}
@Override
- public void onInvocation(SinkParams parameters, EventSinkRuntimeContext
runtimeContext) throws SpRuntimeException {
+ public void onPipelineStarted(IDataSinkParameters parameters,
EventSinkRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
String redisHost = extractor.singleValueParameter(REDIS_HOST_KEY,
String.class);
Integer redisPort = extractor.singleValueParameter(REDIS_PORT_KEY,
Integer.class);
@@ -123,7 +127,7 @@ public class RedisSink extends StreamPipesDataSink {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
redis.onDetach();
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/tsfile/TsFileSink.java
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/tsfile/TsFileSink.java
index 0bdf442cca..7b135b56b6 100644
---
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/tsfile/TsFileSink.java
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/tsfile/TsFileSink.java
@@ -19,10 +19,12 @@
package org.apache.streampipes.sinks.databases.jvm.tsfile;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.AbstractField;
import org.apache.streampipes.model.schema.EventProperty;
@@ -31,13 +33,12 @@ import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.vocabulary.XSD;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
@@ -55,7 +56,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
-public class TsFileSink extends StreamPipesDataSink {
+public class TsFileSink implements IStreamPipesDataSink {
private static final Logger log = LoggerFactory.getLogger(TsFileSink.class);
public static final String DEVICE_ID_KEY = "device_id";
@@ -104,29 +105,32 @@ public class TsFileSink extends StreamPipesDataSink {
private boolean aligned = false;
@Override
- public DataSinkDescription declareModel() {
- return DataSinkBuilder
- .create("org.apache.streampipes.sinks.databases.jvm.tsfile", 0)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON).
- category(DataSinkType.DATABASE)
- .requiredTextParameter(Labels.withId(TSFILE_NAME_KEY))
- .requiredTextParameter(Labels.withId(DEVICE_ID_KEY))
- .requiredTextParameter(Labels.withId(TSFILE_GENERATION_DIRECTORY_KRY))
- .requiredLongParameter(Labels.withId(MAX_TSFILE_SIZE_KEY), 1024L *
1024 * 10)
- .requiredLongParameter(Labels.withId(MAX_FLUSH_DISK_SIZE_KEY),
Long.MAX_VALUE)
- .requiredSingleValueSelection(Labels.withId(ALIGNED),
Options.from("False", "True"))
- .requiredStream(
- StreamRequirementsBuilder.create()
- .requiredPropertyWithUnaryMapping
- (EpRequirements.timestampReq(),
Labels.withId(TIMESTAMP_MAPPING_KEY),
- PropertyScope.NONE).build()
- )
- .build();
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ TsFileSink::new,
+ DataSinkBuilder
+ .create("org.apache.streampipes.sinks.databases.jvm.tsfile", 0)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON).
+ category(DataSinkType.DATABASE)
+ .requiredTextParameter(Labels.withId(TSFILE_NAME_KEY))
+ .requiredTextParameter(Labels.withId(DEVICE_ID_KEY))
+
.requiredTextParameter(Labels.withId(TSFILE_GENERATION_DIRECTORY_KRY))
+ .requiredLongParameter(Labels.withId(MAX_TSFILE_SIZE_KEY), 1024L *
1024 * 10)
+ .requiredLongParameter(Labels.withId(MAX_FLUSH_DISK_SIZE_KEY),
Long.MAX_VALUE)
+ .requiredSingleValueSelection(Labels.withId(ALIGNED),
Options.from("False", "True"))
+ .requiredStream(
+ StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping
+ (EpRequirements.timestampReq(),
Labels.withId(TIMESTAMP_MAPPING_KEY),
+ PropertyScope.NONE).build()
+ )
+ .build()
+ );
}
@Override
- public void onInvocation(SinkParams parameters, EventSinkRuntimeContext
runtimeContext) throws SpRuntimeException {
+ public void onPipelineStarted(IDataSinkParameters parameters,
EventSinkRuntimeContext runtimeContext) {
this.tsFileName =
parameters.extractor().singleValueParameter(TSFILE_NAME_KEY, String.class);
this.deviceId = parameters.extractor().singleValueParameter(DEVICE_ID_KEY,
String.class);
this.dirAbsolutePath =
parameters.extractor().singleValueParameter(TSFILE_GENERATION_DIRECTORY_KRY,
String.class);
@@ -148,7 +152,7 @@ public class TsFileSink extends StreamPipesDataSink {
}
@Override
- public void onDetach() {
+ public void onPipelineStopped() {
try {
if (tsFileWriter != null){
tsFileWriter.close();
diff --git
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index 1fbb400dd8..eb4565271d 100644
---
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -25,14 +25,16 @@ import org.apache.streampipes.dataexplorer.TimeSeriesStore;
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import
org.apache.streampipes.model.datalake.DataLakeMeasureSchemaUpdateStrategy;
import org.apache.streampipes.model.datalake.RetentionTimeConfig;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.schema.PropertyScope;
@@ -40,19 +42,18 @@ import
org.apache.streampipes.model.staticproperty.RuntimeResolvableAnyStaticPro
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
-public class DataLakeSink extends StreamPipesDataSink implements
SupportsRuntimeConfig {
+public class DataLakeSink implements IStreamPipesDataSink,
SupportsRuntimeConfig {
private static final String DATABASE_MEASUREMENT_KEY = "db_measurement";
private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping";
@@ -69,33 +70,36 @@ public class DataLakeSink extends StreamPipesDataSink
implements SupportsRuntime
@Override
- public DataSinkDescription declareModel() {
- return DataSinkBuilder
- .create("org.apache.streampipes.sinks.internal.jvm.datalake", 2)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .category(DataSinkType.INTERNAL)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.timestampReq(),
- Labels.withId(TIMESTAMP_MAPPING_KEY),
- PropertyScope.NONE
- )
- .build())
- .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
- .requiredSingleValueSelection(
- Labels.withId(SCHEMA_UPDATE_KEY),
- Options.from(SCHEMA_UPDATE_OPTION, EXTEND_EXISTING_SCHEMA_OPTION)
- )
-
.requiredMultiValueSelectionFromContainer(Labels.withId(DIMENSIONS_KEY))
- .requiredSlideToggle(Labels.withId(IGNORE_DUPLICATES_KEY), false)
- .build();
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ DataLakeSink::new,
+ DataSinkBuilder
+ .create("org.apache.streampipes.sinks.internal.jvm.datalake", 2)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .category(DataSinkType.INTERNAL)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_MAPPING_KEY),
+ PropertyScope.NONE
+ )
+ .build())
+ .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
+ .requiredSingleValueSelection(
+ Labels.withId(SCHEMA_UPDATE_KEY),
+ Options.from(SCHEMA_UPDATE_OPTION,
EXTEND_EXISTING_SCHEMA_OPTION)
+ )
+
.requiredMultiValueSelectionFromContainer(Labels.withId(DIMENSIONS_KEY))
+ .requiredSlideToggle(Labels.withId(IGNORE_DUPLICATES_KEY), false)
+ .build()
+ );
}
@Override
- public void onInvocation(SinkParams parameters, EventSinkRuntimeContext
runtimeContext) throws SpRuntimeException {
-
+ public void onPipelineStarted(IDataSinkParameters parameters,
EventSinkRuntimeContext runtimeContext) {
+
var extractor = parameters.extractor();
var timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
var measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY,
String.class);
@@ -142,7 +146,7 @@ public class DataLakeSink extends StreamPipesDataSink
implements SupportsRuntime
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
this.timeSeriesStore.close();
}
diff --git
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/InternalStreamPipesNotificationSink.java
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/InternalStreamPipesNotificationSink.java
index 72a13a2316..ba946c7f92 100644
---
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/InternalStreamPipesNotificationSink.java
+++
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/InternalStreamPipesNotificationSink.java
@@ -20,8 +20,9 @@ package
org.apache.streampipes.sinks.internal.jvm.notification;
import org.apache.streampipes.client.api.IStreamPipesClient;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.Notification;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
@@ -29,10 +30,10 @@ import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.pe.shared.PlaceholderExtractor;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
import java.util.Date;
@@ -52,9 +53,8 @@ public class InternalStreamPipesNotificationSink extends
StreamPipesNotification
@Override
- public void onInvocation(SinkParams parameters, EventSinkRuntimeContext
context) throws
-
SpRuntimeException {
- super.onInvocation(parameters, context);
+ public void onPipelineStarted(IDataSinkParameters parameters,
EventSinkRuntimeContext context) {
+ super.onPipelineStarted(parameters, context);
this.title = parameters.extractor()
.singleValueParameter(TITLE_KEY, String.class);
this.content = parameters.extractor()
@@ -88,10 +88,21 @@ public class InternalStreamPipesNotificationSink extends
StreamPipesNotification
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
}
+ @Override
+ public IDataSinkConfiguration declareConfig() {
+ var builder = declareModelWithoutSilentPeriod();
+ addSilentPeriodParameter(builder);
+
+ return DataSinkConfiguration.create(
+ InternalStreamPipesNotificationSink::new,
+ builder.build()
+ );
+ }
+
@Override
public DataSinkBuilder declareModelWithoutSilentPeriod() {
return DataSinkBuilder
diff --git
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesNotificationSink.java
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesNotificationSink.java
index d4fc478e55..8954ae12bf 100644
---
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesNotificationSink.java
+++
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesNotificationSink.java
@@ -18,13 +18,12 @@
package org.apache.streampipes.wrapper.standalone;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
-import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
import java.time.Instant;
@@ -33,10 +32,10 @@ import java.time.Instant;
* <p>
* It provides some share functionalities for all notification sinks in
StreamPipes.
* Thereby, it slightly modifies the interfaces to implement for the actual
data sink compared
- * to sinks directly inheriting from {@link StreamPipesDataSink}.
+ * to sinks directly implementing {@link IStreamPipesDataSink}.
*/
-public abstract class StreamPipesNotificationSink extends StreamPipesDataSink {
+public abstract class StreamPipesNotificationSink implements
IStreamPipesDataSink {
/**
* Default waiting time in minutes between two consecutive notifications.
@@ -75,12 +74,12 @@ public abstract class StreamPipesNotificationSink extends
StreamPipesDataSink {
/**
* This method is meant to be overridden by child classes.
* It contains some important logic, and child classes are expected to call
- * the super.onInvocation() to ensure proper behavior.
+ * the super.onPipelineStarted() to ensure proper behavior.
*/
- public void onInvocation(
- SinkParams parameters,
+ public void onPipelineStarted(
+ IDataSinkParameters parameters,
EventSinkRuntimeContext runtimeContext
- ) throws SpRuntimeException {
+ ) {
// convert input given in minutes to seconds
// this is later used to determine if a notification should be sent
this.silentPeriodInSeconds = parameters.extractor()
@@ -106,12 +105,23 @@ public abstract class StreamPipesNotificationSink extends
StreamPipesDataSink {
*/
public abstract void onNotificationEvent(Event inputEvent);
- @Override
- public DataSinkDescription declareModel() {
-
- var builder = declareModelWithoutSilentPeriod();
+ /**
+ * Abstract method to be implemented by subclasses for declaring the
description of the data sink.
+ * Unlike {@link IStreamPipesDataSink#declareConfig()} it is expected to
return a {@link DataSinkBuilder}
+ *
+ * @return The DataSinkBuilder representing the notification model.
+ */
+ public abstract DataSinkBuilder declareModelWithoutSilentPeriod();
- builder.requiredIntegerParameter(
+ /**
+ * Helper method to add the silent period parameter to the data sink builder.
+ * This is typically called by subclasses when building their configuration.
+ *
+ * @param builder The DataSinkBuilder to add the silent period parameter to.
+ * @return The DataSinkBuilder with the silent period parameter added.
+ */
+ protected DataSinkBuilder addSilentPeriodParameter(DataSinkBuilder builder) {
+ return builder.requiredIntegerParameter(
Labels.from(
KEY_SILENT_PERIOD,
"Silent Period [min]",
@@ -119,18 +129,6 @@ public abstract class StreamPipesNotificationSink extends
StreamPipesDataSink {
),
DEFAULT_WAITING_TIME_MINUTES
);
-
- return builder.build();
-
}
- /**
- * Abstract method to be implemented by subclasses for declaring the
description of the data sink.
- * Unlike {@link StreamPipesDataSink#declareModel()} it is expected to
return a {@link DataSinkBuilder}
- *
- * @return The DataSinkBuilder representing the notification model.
- */
-
- public abstract DataSinkBuilder declareModelWithoutSilentPeriod();
-
}