This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch
3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3927-migrate-all-data-sinks-from-streampipesdatasink-to-istreampipesdatasink
by this push:
new 4ec158b4c6 refactor(#3927): Update data sink implementations to use
new configuration interface
4ec158b4c6 is described below
commit 4ec158b4c641a2ae9129279ba7cc96682441c1a1
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Nov 14 07:48:08 2025 +0100
refactor(#3927): Update data sink implementations to use new configuration
interface
---
.../sinks/databases/jvm/couchdb/CouchDbSink.java | 45 ++++++++++--------
.../sinks/databases/jvm/ditto/DittoSink.java | 55 ++++++++++++----------
.../sinks/databases/jvm/iotdb/IotDbSink.java | 55 ++++++++++++----------
.../sinks/databases/jvm/milvus/MilvusSink.java | 52 ++++++++++----------
.../sinks/databases/jvm/parquet/ParquetSink.java | 54 +++++++++++----------
5 files changed, 142 insertions(+), 119 deletions(-)
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/couchdb/CouchDbSink.java
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/couchdb/CouchDbSink.java
index 2d763767a6..b228758fdf 100644
---
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/couchdb/CouchDbSink.java
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/couchdb/CouchDbSink.java
@@ -19,24 +19,25 @@
package org.apache.streampipes.sinks.databases.jvm.couchdb;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.EventConverter;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import org.lightcouch.CouchDbClient;
import org.lightcouch.CouchDbProperties;
-public class CouchDbSink extends StreamPipesDataSink {
+public class CouchDbSink implements IStreamPipesDataSink {
private static final String DATABASE_HOST_KEY = "db_host";
private static final String DATABASE_PORT_KEY = "db_port";
@@ -47,24 +48,28 @@ public class CouchDbSink extends StreamPipesDataSink {
private CouchDbClient couchDbClient;
@Override
- public DataSinkDescription declareModel() {
- return
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.couchdb", 0)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .category(DataSinkType.DATABASE)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .requiredTextParameter(Labels.withId(DATABASE_HOST_KEY))
- .requiredIntegerParameter(Labels.withId(DATABASE_PORT_KEY))
- .requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
- .build();
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ CouchDbSink::new,
+
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.couchdb", 0)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .category(DataSinkType.DATABASE)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .requiredTextParameter(Labels.withId(DATABASE_HOST_KEY))
+ .requiredIntegerParameter(Labels.withId(DATABASE_PORT_KEY))
+ .requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
+ .build()
+ );
}
@Override
- public void onInvocation(SinkParams parameters,
- EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(
+ IDataSinkParameters parameters,
+ EventSinkRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
String hostname = extractor.singleValueParameter(DATABASE_HOST_KEY,
String.class);
Integer port = extractor.singleValueParameter(DATABASE_PORT_KEY,
Integer.class);
@@ -94,7 +99,7 @@ public class CouchDbSink extends StreamPipesDataSink {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
this.couchDbClient.shutdown();
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/ditto/DittoSink.java
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/ditto/DittoSink.java
index ab2c3f10d2..5fb96397d1 100644
---
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/ditto/DittoSink.java
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/ditto/DittoSink.java
@@ -18,19 +18,20 @@
package org.apache.streampipes.sinks.databases.jvm.ditto;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import org.eclipse.ditto.client.DittoClient;
import org.eclipse.ditto.client.DittoClients;
@@ -51,7 +52,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
-public class DittoSink extends StreamPipesDataSink {
+public class DittoSink implements IStreamPipesDataSink {
private static final String DITTO_API_ENDPOINT_KEY = "dittoApiEndpointKey";
private static final String DITTO_USER_KEY = "dittoUserKey";
@@ -70,27 +71,31 @@ public class DittoSink extends StreamPipesDataSink {
@Override
- public DataSinkDescription declareModel() {
- return DataSinkBuilder
- .create("org.apache.streampipes.sinks.databases.ditto", 0)
- .category(DataSinkType.FORWARD)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-
.requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithNaryMapping(
- EpRequirements.anyProperty(),
- Labels.withId(SELECTED_FIELDS_KEY),
- PropertyScope.NONE).build())
- .requiredTextParameter(Labels.withId(DITTO_API_ENDPOINT_KEY))
- .requiredTextParameter(Labels.withId(DITTO_USER_KEY))
- .requiredSecret(Labels.withId(DITTO_PASSWORD_KEY))
- .requiredTextParameter(Labels.withId(DITTO_THING_ID_KEY))
- .requiredTextParameter(Labels.withId(DITTO_FEATURE_ID_KEY))
- .build();
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ DittoSink::new,
+ DataSinkBuilder
+ .create("org.apache.streampipes.sinks.databases.ditto", 0)
+ .category(DataSinkType.FORWARD)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+
.requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithNaryMapping(
+ EpRequirements.anyProperty(),
+ Labels.withId(SELECTED_FIELDS_KEY),
+ PropertyScope.NONE).build())
+ .requiredTextParameter(Labels.withId(DITTO_API_ENDPOINT_KEY))
+ .requiredTextParameter(Labels.withId(DITTO_USER_KEY))
+ .requiredSecret(Labels.withId(DITTO_PASSWORD_KEY))
+ .requiredTextParameter(Labels.withId(DITTO_THING_ID_KEY))
+ .requiredTextParameter(Labels.withId(DITTO_FEATURE_ID_KEY))
+ .build()
+ );
}
@Override
- public void onInvocation(SinkParams parameters,
- EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(
+ IDataSinkParameters parameters,
+ EventSinkRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
String dittoApiEndpoint = extractor.textParameter(DITTO_API_ENDPOINT_KEY);
String dittoUser = extractor.textParameter(DITTO_USER_KEY);
@@ -104,12 +109,12 @@ public class DittoSink extends StreamPipesDataSink {
.endpoint(dittoApiEndpoint)
.build();
WebSocketMessagingProvider provider =
WebSocketMessagingProvider.newInstance(configuration,
- AuthenticationProviders
+
AuthenticationProviders
.basic(BasicAuthenticationConfiguration.newBuilder()
.username(dittoUser)
.password(dittoPassword)
.build()),
- Executors.newFixedThreadPool(4));
+
Executors.newFixedThreadPool(4));
this.client = DittoClients.newInstance(provider);
TwinThingHandle twinHandle = client.twin().forId(ThingId.of(thingId));
@@ -157,7 +162,7 @@ public class DittoSink extends StreamPipesDataSink {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
this.client.destroy();
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDbSink.java
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDbSink.java
index 44dd0dda0e..7c967c1e60 100644
---
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDbSink.java
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDbSink.java
@@ -19,20 +19,21 @@
package org.apache.streampipes.sinks.databases.jvm.iotdb;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.AbstractField;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -45,7 +46,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-public class IotDbSink extends StreamPipesDataSink {
+public class IotDbSink implements IStreamPipesDataSink {
private static final Logger LOG = LoggerFactory.getLogger(IotDbSink.class);
@@ -72,29 +73,33 @@ public class IotDbSink extends StreamPipesDataSink {
private SessionPool sessionPool;
@Override
- public DataSinkDescription declareModel() {
- return DataSinkBuilder
- .create("org.apache.streampipes.sinks.databases.jvm.iotdb", 0)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON).
- category(DataSinkType.DATABASE)
- .requiredStream(
- StreamRequirementsBuilder.create()
-
.requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(),
Labels.withId(TIMESTAMP_MAPPING_KEY),
- PropertyScope.NONE).build()
- )
- .requiredTextParameter(Labels.withId(HOST_KEY))
- .requiredIntegerParameter(Labels.withId(PORT_KEY), 6667)
- .requiredTextParameter(Labels.withId(USER_KEY), "root")
- .requiredSecret(Labels.withId(PASSWORD_KEY))
- .requiredTextParameter(Labels.withId(DATABASE_KEY))
- .requiredTextParameter(Labels.withId(DEVICE_KEY))
- .build();
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ IotDbSink::new,
+ DataSinkBuilder
+ .create("org.apache.streampipes.sinks.databases.jvm.iotdb", 0)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON).
+ category(DataSinkType.DATABASE)
+ .requiredStream(
+ StreamRequirementsBuilder.create()
+
.requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(),
Labels.withId(TIMESTAMP_MAPPING_KEY),
+ PropertyScope.NONE).build()
+ )
+ .requiredTextParameter(Labels.withId(HOST_KEY))
+ .requiredIntegerParameter(Labels.withId(PORT_KEY), 6667)
+ .requiredTextParameter(Labels.withId(USER_KEY), "root")
+ .requiredSecret(Labels.withId(PASSWORD_KEY))
+ .requiredTextParameter(Labels.withId(DATABASE_KEY))
+ .requiredTextParameter(Labels.withId(DEVICE_KEY))
+ .build()
+ );
}
@Override
- public void onInvocation(SinkParams parameters,
- EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(
+ IDataSinkParameters parameters,
+ EventSinkRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
final String host = extractor.singleValueParameter(HOST_KEY, String.class);
final Integer port = extractor.singleValueParameter(PORT_KEY,
Integer.class);
@@ -181,7 +186,7 @@ public class IotDbSink extends StreamPipesDataSink {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
sessionPool.close();
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/milvus/MilvusSink.java
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/milvus/MilvusSink.java
index 5cb55a6564..0870adddd6 100644
---
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/milvus/MilvusSink.java
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/milvus/MilvusSink.java
@@ -19,10 +19,12 @@
package org.apache.streampipes.sinks.databases.jvm.milvus;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyNested;
@@ -31,13 +33,12 @@ import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.vocabulary.XSD;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
@@ -63,7 +64,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class MilvusSink extends StreamPipesDataSink {
+public class MilvusSink implements IStreamPipesDataSink {
public static final String MILVUS_URI_KEY = "milvus_uri";
public static final String MILVUS_TOKEN_KEY = "milvus_token";
@@ -123,22 +124,24 @@ public class MilvusSink extends StreamPipesDataSink {
};
@Override
- public DataSinkDescription declareModel() {
- return DataSinkBuilder
- .create("org.apache.streampipes.sinks.databases.jvm.milvus", 0)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON).
- category(DataSinkType.DATABASE)
- .requiredTextParameter(Labels.withId(MILVUS_URI_KEY))
- .requiredTextParameter(Labels.withId(MILVUS_TOKEN_KEY), "root:Milvus")
- .requiredTextParameter(Labels.withId(MILVUS_DBNAME_KEY))
- .requiredTextParameter(Labels.withId(DATABASE_REPLICA_NUMBER_KEY), "2")
- .requiredTextParameter(Labels.withId(COLLECTION_NAME_KEY))
- .requiredTextParameter(Labels.withId(PRIMARY), "id")
- .requiredIntegerParameter(Labels.withId(DIMENSION), 2)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(),
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ MilvusSink::new,
+ DataSinkBuilder
+ .create("org.apache.streampipes.sinks.databases.jvm.milvus", 0)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON).
+ category(DataSinkType.DATABASE)
+ .requiredTextParameter(Labels.withId(MILVUS_URI_KEY))
+ .requiredTextParameter(Labels.withId(MILVUS_TOKEN_KEY),
"root:Milvus")
+ .requiredTextParameter(Labels.withId(MILVUS_DBNAME_KEY))
+ .requiredTextParameter(Labels.withId(DATABASE_REPLICA_NUMBER_KEY),
"2")
+ .requiredTextParameter(Labels.withId(COLLECTION_NAME_KEY))
+ .requiredTextParameter(Labels.withId(PRIMARY), "id")
+ .requiredIntegerParameter(Labels.withId(DIMENSION), 2)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(),
Labels.withId(VECTOR_KEY),
PropertyScope.NONE)
.build())
@@ -146,12 +149,13 @@ public class MilvusSink extends StreamPipesDataSink {
Options.from(INDEX_MAP.keySet().toArray(new String[0])))
.requiredSingleValueSelection(Labels.withId(METRIC_TYPE),
Options.from(METRIC_TYPE_MAP.keySet().toArray(new String[0])))
- .build();
+ .build()
+ );
}
@Override
- public void onInvocation(SinkParams parameters,
- EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(IDataSinkParameters parameters,
+ EventSinkRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
final String uri = extractor.singleValueParameter(MILVUS_URI_KEY,
String.class);
final String token = extractor.singleValueParameter(MILVUS_TOKEN_KEY,
String.class);
@@ -234,7 +238,7 @@ public class MilvusSink extends StreamPipesDataSink {
}
@Override
- public void onDetach() {
+ public void onPipelineStopped() {
client.close();
pool.close();
}
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/parquet/ParquetSink.java
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/parquet/ParquetSink.java
index 91d4c700a7..3f2a080dbe 100644
---
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/parquet/ParquetSink.java
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/parquet/ParquetSink.java
@@ -19,10 +19,12 @@
package org.apache.streampipes.sinks.databases.jvm.parquet;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.AbstractField;
import org.apache.streampipes.model.schema.EventProperty;
@@ -30,13 +32,12 @@ import
org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.vocabulary.XSD;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@@ -60,7 +61,7 @@ import java.util.Map;
import java.util.Random;
-public class ParquetSink extends StreamPipesDataSink {
+public class ParquetSink implements IStreamPipesDataSink {
private static final Logger log = LoggerFactory.getLogger(ParquetSink.class);
private static final String SCHEMA_NAME_KEY = "schema_name";
@@ -111,29 +112,32 @@ public class ParquetSink extends StreamPipesDataSink {
};
@Override
- public DataSinkDescription declareModel() {
- return DataSinkBuilder
- .create("org.apache.streampipes.sinks.databases.jvm.parquet", 0)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON).
- category(DataSinkType.DATABASE)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .requiredTextParameter(Labels.withId(SCHEMA_NAME_KEY))
- .requiredTextParameter(Labels.withId(SCHEMA_NAMESPACE_KEY))
- .requiredTextParameter(Labels.withId(PARQUET_FILE_NAME_KEY))
- .requiredTextParameter(Labels.withId(PARQUET_GENERATION_DIRECTORY_KRY))
- .requiredIntegerParameter(Labels.withId(ROW_GROUP_SIZE_KEY), 134217728)
- .requiredIntegerParameter(Labels.withId(PAGE_SIZE_KEY), 1048576)
-
.requiredSingleValueSelection(Labels.withId(COMPRESSION_CODEC_NAME_KEY),
- Options.from(COMPRESSION_CODEC_NAME_MAP.keySet().toArray(new
String[0])))
- .build();
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ ParquetSink::new,
+ DataSinkBuilder
+ .create("org.apache.streampipes.sinks.databases.jvm.parquet", 0)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON).
+ category(DataSinkType.DATABASE)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .requiredTextParameter(Labels.withId(SCHEMA_NAME_KEY))
+ .requiredTextParameter(Labels.withId(SCHEMA_NAMESPACE_KEY))
+ .requiredTextParameter(Labels.withId(PARQUET_FILE_NAME_KEY))
+
.requiredTextParameter(Labels.withId(PARQUET_GENERATION_DIRECTORY_KRY))
+ .requiredIntegerParameter(Labels.withId(ROW_GROUP_SIZE_KEY),
134217728)
+ .requiredIntegerParameter(Labels.withId(PAGE_SIZE_KEY), 1048576)
+
.requiredSingleValueSelection(Labels.withId(COMPRESSION_CODEC_NAME_KEY),
+
Options.from(COMPRESSION_CODEC_NAME_MAP.keySet().toArray(new String[0])))
+ .build()
+ );
}
@Override
- public void onInvocation(SinkParams parameters, EventSinkRuntimeContext
runtimeContext) throws SpRuntimeException {
+ public void onPipelineStarted(IDataSinkParameters parameters,
EventSinkRuntimeContext runtimeContext) {
this.schemaName =
parameters.extractor().singleValueParameter(SCHEMA_NAME_KEY, String.class);
this.schemaNamespace =
parameters.extractor().singleValueParameter(SCHEMA_NAMESPACE_KEY, String.class);
this.parquetFileName =
parameters.extractor().singleValueParameter(PARQUET_FILE_NAME_KEY,
String.class);
@@ -157,7 +161,7 @@ public class ParquetSink extends StreamPipesDataSink {
}
@Override
- public void onDetach() {
+ public void onPipelineStopped() {
try {
if (writer != null){
writer.close();