This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
 by this push:
     new 4ec158b4c6 refactor(#3927): Update data sink implementations to use 
new configuration interface
4ec158b4c6 is described below

commit 4ec158b4c641a2ae9129279ba7cc96682441c1a1
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Nov 14 07:48:08 2025 +0100

    refactor(#3927): Update data sink implementations to use new configuration 
interface
---
 .../sinks/databases/jvm/couchdb/CouchDbSink.java   | 45 ++++++++++--------
 .../sinks/databases/jvm/ditto/DittoSink.java       | 55 ++++++++++++----------
 .../sinks/databases/jvm/iotdb/IotDbSink.java       | 55 ++++++++++++----------
 .../sinks/databases/jvm/milvus/MilvusSink.java     | 52 ++++++++++----------
 .../sinks/databases/jvm/parquet/ParquetSink.java   | 54 +++++++++++----------
 5 files changed, 142 insertions(+), 119 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/couchdb/CouchDbSink.java
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/couchdb/CouchDbSink.java
index 2d763767a6..b228758fdf 100644
--- 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/couchdb/CouchDbSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/couchdb/CouchDbSink.java
@@ -19,24 +19,25 @@
 package org.apache.streampipes.sinks.databases.jvm.couchdb;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.EventConverter;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
 import org.lightcouch.CouchDbClient;
 import org.lightcouch.CouchDbProperties;
 
-public class CouchDbSink extends StreamPipesDataSink {
+public class CouchDbSink implements IStreamPipesDataSink {
 
   private static final String DATABASE_HOST_KEY = "db_host";
   private static final String DATABASE_PORT_KEY = "db_port";
@@ -47,24 +48,28 @@ public class CouchDbSink extends StreamPipesDataSink {
   private CouchDbClient couchDbClient;
 
   @Override
-  public DataSinkDescription declareModel() {
-    return 
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.couchdb", 0)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .category(DataSinkType.DATABASE)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredProperty(EpRequirements.anyProperty())
-            .build())
-        .requiredTextParameter(Labels.withId(DATABASE_HOST_KEY))
-        .requiredIntegerParameter(Labels.withId(DATABASE_PORT_KEY))
-        .requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
-        .build();
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(
+        CouchDbSink::new,
+        
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.couchdb", 0)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .category(DataSinkType.DATABASE)
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredProperty(EpRequirements.anyProperty())
+                .build())
+            .requiredTextParameter(Labels.withId(DATABASE_HOST_KEY))
+            .requiredIntegerParameter(Labels.withId(DATABASE_PORT_KEY))
+            .requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(SinkParams parameters,
-                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(
+      IDataSinkParameters parameters,
+      EventSinkRuntimeContext runtimeContext) {
     var extractor = parameters.extractor();
     String hostname = extractor.singleValueParameter(DATABASE_HOST_KEY, 
String.class);
     Integer port = extractor.singleValueParameter(DATABASE_PORT_KEY, 
Integer.class);
@@ -94,7 +99,7 @@ public class CouchDbSink extends StreamPipesDataSink {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     this.couchDbClient.shutdown();
   }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/ditto/DittoSink.java
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/ditto/DittoSink.java
index ab2c3f10d2..5fb96397d1 100644
--- 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/ditto/DittoSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/ditto/DittoSink.java
@@ -18,19 +18,20 @@
 package org.apache.streampipes.sinks.databases.jvm.ditto;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
 import org.eclipse.ditto.client.DittoClient;
 import org.eclipse.ditto.client.DittoClients;
@@ -51,7 +52,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 
-public class DittoSink extends StreamPipesDataSink {
+public class DittoSink implements IStreamPipesDataSink {
 
   private static final String DITTO_API_ENDPOINT_KEY = "dittoApiEndpointKey";
   private static final String DITTO_USER_KEY = "dittoUserKey";
@@ -70,27 +71,31 @@ public class DittoSink extends StreamPipesDataSink {
 
 
   @Override
-  public DataSinkDescription declareModel() {
-    return DataSinkBuilder
-        .create("org.apache.streampipes.sinks.databases.ditto", 0)
-        .category(DataSinkType.FORWARD)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        
.requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithNaryMapping(
-            EpRequirements.anyProperty(),
-            Labels.withId(SELECTED_FIELDS_KEY),
-            PropertyScope.NONE).build())
-        .requiredTextParameter(Labels.withId(DITTO_API_ENDPOINT_KEY))
-        .requiredTextParameter(Labels.withId(DITTO_USER_KEY))
-        .requiredSecret(Labels.withId(DITTO_PASSWORD_KEY))
-        .requiredTextParameter(Labels.withId(DITTO_THING_ID_KEY))
-        .requiredTextParameter(Labels.withId(DITTO_FEATURE_ID_KEY))
-        .build();
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(
+        DittoSink::new,
+        DataSinkBuilder
+            .create("org.apache.streampipes.sinks.databases.ditto", 0)
+            .category(DataSinkType.FORWARD)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            
.requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithNaryMapping(
+                EpRequirements.anyProperty(),
+                Labels.withId(SELECTED_FIELDS_KEY),
+                PropertyScope.NONE).build())
+            .requiredTextParameter(Labels.withId(DITTO_API_ENDPOINT_KEY))
+            .requiredTextParameter(Labels.withId(DITTO_USER_KEY))
+            .requiredSecret(Labels.withId(DITTO_PASSWORD_KEY))
+            .requiredTextParameter(Labels.withId(DITTO_THING_ID_KEY))
+            .requiredTextParameter(Labels.withId(DITTO_FEATURE_ID_KEY))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(SinkParams parameters,
-                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(
+      IDataSinkParameters parameters,
+      EventSinkRuntimeContext runtimeContext) {
     var extractor = parameters.extractor();
     String dittoApiEndpoint = extractor.textParameter(DITTO_API_ENDPOINT_KEY);
     String dittoUser = extractor.textParameter(DITTO_USER_KEY);
@@ -104,12 +109,12 @@ public class DittoSink extends StreamPipesDataSink {
         .endpoint(dittoApiEndpoint)
         .build();
     WebSocketMessagingProvider provider = 
WebSocketMessagingProvider.newInstance(configuration,
-        AuthenticationProviders
+                                                                               
  AuthenticationProviders
             .basic(BasicAuthenticationConfiguration.newBuilder()
                 .username(dittoUser)
                 .password(dittoPassword)
                 .build()),
-        Executors.newFixedThreadPool(4));
+                                                                               
  Executors.newFixedThreadPool(4));
     this.client = DittoClients.newInstance(provider);
 
     TwinThingHandle twinHandle = client.twin().forId(ThingId.of(thingId));
@@ -157,7 +162,7 @@ public class DittoSink extends StreamPipesDataSink {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     this.client.destroy();
   }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDbSink.java
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDbSink.java
index 44dd0dda0e..7c967c1e60 100644
--- 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDbSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDbSink.java
@@ -19,20 +19,21 @@
 package org.apache.streampipes.sinks.databases.jvm.iotdb;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.field.AbstractField;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
@@ -45,7 +46,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public class IotDbSink extends StreamPipesDataSink {
+public class IotDbSink implements IStreamPipesDataSink {
 
   private static final Logger LOG = LoggerFactory.getLogger(IotDbSink.class);
 
@@ -72,29 +73,33 @@ public class IotDbSink extends StreamPipesDataSink {
   private SessionPool sessionPool;
 
   @Override
-  public DataSinkDescription declareModel() {
-    return DataSinkBuilder
-        .create("org.apache.streampipes.sinks.databases.jvm.iotdb", 0)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON).
-        category(DataSinkType.DATABASE)
-        .requiredStream(
-            StreamRequirementsBuilder.create()
-                
.requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(), 
Labels.withId(TIMESTAMP_MAPPING_KEY),
-                    PropertyScope.NONE).build()
-        )
-        .requiredTextParameter(Labels.withId(HOST_KEY))
-        .requiredIntegerParameter(Labels.withId(PORT_KEY), 6667)
-        .requiredTextParameter(Labels.withId(USER_KEY), "root")
-        .requiredSecret(Labels.withId(PASSWORD_KEY))
-        .requiredTextParameter(Labels.withId(DATABASE_KEY))
-        .requiredTextParameter(Labels.withId(DEVICE_KEY))
-        .build();
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(
+        IotDbSink::new,
+        DataSinkBuilder
+            .create("org.apache.streampipes.sinks.databases.jvm.iotdb", 0)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON).
+            category(DataSinkType.DATABASE)
+            .requiredStream(
+                StreamRequirementsBuilder.create()
+                    
.requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(), 
Labels.withId(TIMESTAMP_MAPPING_KEY),
+                        PropertyScope.NONE).build()
+            )
+            .requiredTextParameter(Labels.withId(HOST_KEY))
+            .requiredIntegerParameter(Labels.withId(PORT_KEY), 6667)
+            .requiredTextParameter(Labels.withId(USER_KEY), "root")
+            .requiredSecret(Labels.withId(PASSWORD_KEY))
+            .requiredTextParameter(Labels.withId(DATABASE_KEY))
+            .requiredTextParameter(Labels.withId(DEVICE_KEY))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(SinkParams parameters,
-                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(
+      IDataSinkParameters parameters,
+      EventSinkRuntimeContext runtimeContext) {
     var extractor = parameters.extractor();
     final String host = extractor.singleValueParameter(HOST_KEY, String.class);
     final Integer port = extractor.singleValueParameter(PORT_KEY, 
Integer.class);
@@ -181,7 +186,7 @@ public class IotDbSink extends StreamPipesDataSink {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     sessionPool.close();
   }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/milvus/MilvusSink.java
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/milvus/MilvusSink.java
index 5cb55a6564..0870adddd6 100644
--- 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/milvus/MilvusSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/milvus/MilvusSink.java
@@ -19,10 +19,12 @@
 package org.apache.streampipes.sinks.databases.jvm.milvus;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventPropertyNested;
@@ -31,13 +33,12 @@ import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.vocabulary.XSD;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
@@ -63,7 +64,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class MilvusSink extends StreamPipesDataSink {
+public class MilvusSink implements IStreamPipesDataSink {
   public static final String MILVUS_URI_KEY = "milvus_uri";
   public static final String MILVUS_TOKEN_KEY = "milvus_token";
 
@@ -123,22 +124,24 @@ public class MilvusSink extends StreamPipesDataSink {
   };
 
   @Override
-  public DataSinkDescription declareModel() {
-    return DataSinkBuilder
-        .create("org.apache.streampipes.sinks.databases.jvm.milvus", 0)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON).
-        category(DataSinkType.DATABASE)
-        .requiredTextParameter(Labels.withId(MILVUS_URI_KEY))
-        .requiredTextParameter(Labels.withId(MILVUS_TOKEN_KEY), "root:Milvus")
-        .requiredTextParameter(Labels.withId(MILVUS_DBNAME_KEY))
-        .requiredTextParameter(Labels.withId(DATABASE_REPLICA_NUMBER_KEY), "2")
-        .requiredTextParameter(Labels.withId(COLLECTION_NAME_KEY))
-        .requiredTextParameter(Labels.withId(PRIMARY), "id")
-        .requiredIntegerParameter(Labels.withId(DIMENSION), 2)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(),
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(
+        MilvusSink::new,
+        DataSinkBuilder
+            .create("org.apache.streampipes.sinks.databases.jvm.milvus", 0)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON).
+            category(DataSinkType.DATABASE)
+            .requiredTextParameter(Labels.withId(MILVUS_URI_KEY))
+            .requiredTextParameter(Labels.withId(MILVUS_TOKEN_KEY), 
"root:Milvus")
+            .requiredTextParameter(Labels.withId(MILVUS_DBNAME_KEY))
+            .requiredTextParameter(Labels.withId(DATABASE_REPLICA_NUMBER_KEY), 
"2")
+            .requiredTextParameter(Labels.withId(COLLECTION_NAME_KEY))
+            .requiredTextParameter(Labels.withId(PRIMARY), "id")
+            .requiredIntegerParameter(Labels.withId(DIMENSION), 2)
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(),
                 Labels.withId(VECTOR_KEY),
                 PropertyScope.NONE)
             .build())
@@ -146,12 +149,13 @@ public class MilvusSink extends StreamPipesDataSink {
              Options.from(INDEX_MAP.keySet().toArray(new String[0])))
         .requiredSingleValueSelection(Labels.withId(METRIC_TYPE),
              Options.from(METRIC_TYPE_MAP.keySet().toArray(new String[0])))
-        .build();
+        .build()
+    );
   }
 
   @Override
-  public void onInvocation(SinkParams parameters,
-                           EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(IDataSinkParameters parameters,
+                                EventSinkRuntimeContext runtimeContext) {
     var extractor = parameters.extractor();
     final String uri = extractor.singleValueParameter(MILVUS_URI_KEY, 
String.class);
     final String token = extractor.singleValueParameter(MILVUS_TOKEN_KEY, 
String.class);
@@ -234,7 +238,7 @@ public class MilvusSink extends StreamPipesDataSink {
   }
 
   @Override
-  public void onDetach() {
+  public void onPipelineStopped() {
     client.close();
     pool.close();
   }
diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/parquet/ParquetSink.java
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/parquet/ParquetSink.java
index 91d4c700a7..3f2a080dbe 100644
--- 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/parquet/ParquetSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/parquet/ParquetSink.java
@@ -19,10 +19,12 @@
 package org.apache.streampipes.sinks.databases.jvm.parquet;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.field.AbstractField;
 import org.apache.streampipes.model.schema.EventProperty;
@@ -30,13 +32,12 @@ import 
org.apache.streampipes.model.schema.EventPropertyPrimitive;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.vocabulary.XSD;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
@@ -60,7 +61,7 @@ import java.util.Map;
 import java.util.Random;
 
 
-public class ParquetSink extends StreamPipesDataSink {
+public class ParquetSink implements IStreamPipesDataSink {
 
   private static final Logger log = LoggerFactory.getLogger(ParquetSink.class);
   private static final String SCHEMA_NAME_KEY = "schema_name";
@@ -111,29 +112,32 @@ public class ParquetSink extends StreamPipesDataSink {
   };
 
   @Override
-  public DataSinkDescription declareModel() {
-    return DataSinkBuilder
-        .create("org.apache.streampipes.sinks.databases.jvm.parquet", 0)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON).
-        category(DataSinkType.DATABASE)
-        .requiredStream(StreamRequirementsBuilder
-                .create()
-                .requiredProperty(EpRequirements.anyProperty())
-                .build())
-        .requiredTextParameter(Labels.withId(SCHEMA_NAME_KEY))
-        .requiredTextParameter(Labels.withId(SCHEMA_NAMESPACE_KEY))
-        .requiredTextParameter(Labels.withId(PARQUET_FILE_NAME_KEY))
-        .requiredTextParameter(Labels.withId(PARQUET_GENERATION_DIRECTORY_KRY))
-        .requiredIntegerParameter(Labels.withId(ROW_GROUP_SIZE_KEY), 134217728)
-        .requiredIntegerParameter(Labels.withId(PAGE_SIZE_KEY), 1048576)
-        
.requiredSingleValueSelection(Labels.withId(COMPRESSION_CODEC_NAME_KEY),
-                Options.from(COMPRESSION_CODEC_NAME_MAP.keySet().toArray(new 
String[0])))
-        .build();
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(
+        ParquetSink::new,
+        DataSinkBuilder
+            .create("org.apache.streampipes.sinks.databases.jvm.parquet", 0)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON).
+            category(DataSinkType.DATABASE)
+            .requiredStream(StreamRequirementsBuilder
+                    .create()
+                    .requiredProperty(EpRequirements.anyProperty())
+                    .build())
+            .requiredTextParameter(Labels.withId(SCHEMA_NAME_KEY))
+            .requiredTextParameter(Labels.withId(SCHEMA_NAMESPACE_KEY))
+            .requiredTextParameter(Labels.withId(PARQUET_FILE_NAME_KEY))
+            
.requiredTextParameter(Labels.withId(PARQUET_GENERATION_DIRECTORY_KRY))
+            .requiredIntegerParameter(Labels.withId(ROW_GROUP_SIZE_KEY), 
134217728)
+            .requiredIntegerParameter(Labels.withId(PAGE_SIZE_KEY), 1048576)
+            
.requiredSingleValueSelection(Labels.withId(COMPRESSION_CODEC_NAME_KEY),
+                    
Options.from(COMPRESSION_CODEC_NAME_MAP.keySet().toArray(new String[0])))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(SinkParams parameters, EventSinkRuntimeContext 
runtimeContext) throws SpRuntimeException {
+  public void onPipelineStarted(IDataSinkParameters parameters, 
EventSinkRuntimeContext runtimeContext) {
     this.schemaName = 
parameters.extractor().singleValueParameter(SCHEMA_NAME_KEY, String.class);
     this.schemaNamespace = 
parameters.extractor().singleValueParameter(SCHEMA_NAMESPACE_KEY, String.class);
     this.parquetFileName = 
parameters.extractor().singleValueParameter(PARQUET_FILE_NAME_KEY, 
String.class);
@@ -157,7 +161,7 @@ public class ParquetSink extends StreamPipesDataSink {
   }
 
   @Override
-  public void onDetach() {
+  public void onPipelineStopped() {
     try {
       if (writer != null){
         writer.close();

Reply via email to