This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new f6e7a12157 feat: Enhance rest sink (#3573)
f6e7a12157 is described below
commit f6e7a12157b5fb06ea89c237bbf9ba73b442a6cd
Author: Dominik Riemer <[email protected]>
AuthorDate: Tue May 20 22:30:47 2025 +0200
feat: Enhance rest sink (#3573)
* Enhanced Rest Data Sink Component with ability to add headers, retry
functionality, and more extensive logging.
* Refactored labels to use labels.withId in coordination with string.en
instead of labels.from. This change simplifies the configuration and enhances
readability.
# Conflicts:
#
streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
* Added retry paramaters to the rest sink to allow for the user to be able
to input if they want to retry the request and then configure the retry delay
and max number of retries.
* Updated RestSink version to version 1 for migrations
* Changed the "Is Rest Sink Enabled" static property from an "Options" type
to a slider since I learned that a slider represents true or false in
streampipes. I updated integerts from .requiredIntegerParameter to a static
property free text integer since these values more closely represent a static
property rather than a parameter. I also added the V0->V1 migration for those
currently using the RestSink.
* Adding Licenses to new addition files for apache RAT validations in maven
tests
* Refactored for Checksyle additions. Added checkstyle to IDE settings and
attempted to format correctly but I couldn't figure out everything in IDEA
Ultimate.
* Fix checkstyle issues
* Fix migration
* Fix checkstyle
---------
Co-authored-by: Matthew Holden <[email protected]>
Co-authored-by: Matthew <[email protected]>
---
.../jvm/BrokerSinksExtensionModuleExport.java | 5 +-
.../jvm/migrations/RestSinkMigrationV1.java | 149 +++++++++++++++++
.../brokers/jvm/rest/RestHeaderConfiguration.java | 25 +++
.../sinks/brokers/jvm/rest/RestSink.java | 184 ++++++++++++++++-----
.../strings.en | 18 ++
5 files changed, 343 insertions(+), 38 deletions(-)
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokerSinksExtensionModuleExport.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokerSinksExtensionModuleExport.java
index 86fe680f7c..df4fce7bf2 100644
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokerSinksExtensionModuleExport.java
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokerSinksExtensionModuleExport.java
@@ -24,6 +24,7 @@ import
org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import
org.apache.streampipes.sinks.brokers.jvm.bufferrest.BufferRestPublisherSink;
import org.apache.streampipes.sinks.brokers.jvm.jms.JmsPublisherSink;
+import org.apache.streampipes.sinks.brokers.jvm.migrations.RestSinkMigrationV1;
import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqPublisherSink;
import org.apache.streampipes.sinks.brokers.jvm.rest.RestSink;
import org.apache.streampipes.sinks.brokers.jvm.websocket.WebsocketServerSink;
@@ -50,6 +51,8 @@ public class BrokerSinksExtensionModuleExport implements
IExtensionModuleExport
@Override
public List<IModelMigrator<?, ?>> migrators() {
- return Collections.emptyList();
+ return List.of(
+ new RestSinkMigrationV1()
+ );
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/migrations/RestSinkMigrationV1.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/migrations/RestSinkMigrationV1.java
new file mode 100644
index 0000000000..d5945c4eb2
--- /dev/null
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/migrations/RestSinkMigrationV1.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.brokers.jvm.migrations;
+
+import
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.SlideToggleStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sinks.brokers.jvm.rest.RestSink;
+import org.apache.streampipes.vocabulary.XSD;
+
+import java.util.List;
+
+public class RestSinkMigrationV1 implements IDataSinkMigrator {
+
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ "org.apache.streampipes.sinks.brokers.jvm.rest",
+ SpServiceTagPrefix.DATA_SINK,
+ 0,
+ 1
+ );
+ }
+
+ @Override
+ public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation
element,
+
IDataSinkParameterExtractor extractor) throws RuntimeException {
+ // Extract the URL value from the old configuration
+ String urlValue = extractor.singleValueParameter(RestSink.URL_KEY,
String.class);
+ if (urlValue == null) {
+ return MigrationResult.failure(element, "URL property not found in old
configuration");
+ }
+
+ addRetryDelayKey(element);
+ addIsRetryEnabledKey(element);
+ addRetryMaxRetriesKey(element);
+ addHeaderCollectionKeys(element);
+
+ return MigrationResult.success(element);
+ }
+
+ public void addRetryDelayKey(DataSinkInvocation element) {
+ var label = Labels.from(
+ RestSink.RETRY_DELAY_MS_KEY,
+ "Retry Delay (ms)", "Duration in ms to wait for request to "
+ + "retry."
+ );
+ var staticProperty = new FreeTextStaticProperty(
+ label.getInternalId(),
+ label.getLabel(),
+ label.getDescription()
+ );
+ staticProperty.setRequiredDatatype(XSD.INTEGER);
+
+ element.getStaticProperties().add(staticProperty);
+ }
+
+ public void addIsRetryEnabledKey(DataSinkInvocation element) {
+ var label = Labels.from(RestSink.IS_RETRY_ENABLED_KEY, "Enable Retry",
+ "If enabled, the request will be retried at an interval defined by the
retry delay up to max retries.");
+ var staticProperty = new SlideToggleStaticProperty(
+ label.getInternalId(),
+ label.getLabel(),
+ label.getDescription(),
+ false
+ );
+ staticProperty.setSelected(false);
+ element.getStaticProperties().add(staticProperty);
+ }
+
+ public void addRetryMaxRetriesKey(DataSinkInvocation element) {
+ var label = Labels.from(RestSink.RETRY_MAX_RETRIES_KEY, "Max Retries",
+ "The maximum number of retries allowed for request to retry.");
+ var staticProperty = new FreeTextStaticProperty(
+ label.getInternalId(),
+ label.getLabel(),
+ label.getDescription()
+ );
+ staticProperty.setRequiredDatatype(XSD.INTEGER);
+ element.getStaticProperties().add(staticProperty);
+ }
+
+ public void addHeaderCollectionKeys(DataSinkInvocation element) {
+ StaticPropertyGroup staticPropertyGroup = new StaticPropertyGroup();
+ staticPropertyGroup.setLabel("Request Headers");
+ staticPropertyGroup.setInternalName(RestSink.HEADER_COLLECTION);
+ staticPropertyGroup.setHorizontalRendering(false);
+
+ var headerKeyLabel = StaticProperties.stringFreeTextProperty(
+ Labels.from(
+ RestSink.HEADER_KEY,
+ "Header",
+ "Optional custom headers to be included with the REST request."
+ )
+ );
+ headerKeyLabel.setValue("");
+ headerKeyLabel.setOptional(true);
+
+ var headerValueLabel = StaticProperties.stringFreeTextProperty(
+ Labels.from(
+ RestSink.HEADER_VALUE,
+ "Header Value",
+ "Optional custom headers to be included with the REST "
+ + "request."
+ )
+ );
+ headerValueLabel.setValue("");
+ headerValueLabel.setOptional(true);
+ staticPropertyGroup.getStaticProperties().addAll(
+ List.of(
+ headerKeyLabel,
+ headerValueLabel
+ )
+ );
+
+ var collection = new CollectionStaticProperty(
+ RestSink.HEADER_COLLECTION,
+ "Request Headers",
+ "Optional custom headers to be included with the REST request.",
+ staticPropertyGroup
+ );
+ element.getStaticProperties().add(collection);
+ }
+}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestHeaderConfiguration.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestHeaderConfiguration.java
new file mode 100644
index 0000000000..d3b94893ce
--- /dev/null
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestHeaderConfiguration.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.brokers.jvm.rest;
+
+public record RestHeaderConfiguration(
+ String headerKey,
+ String headerValue
+) {
+}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
index e09f4f7f81..e1f96eea8f 100644
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
@@ -18,83 +18,193 @@
package org.apache.streampipes.sinks.brokers.jvm.rest;
-
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
+import
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+import org.apache.http.HttpResponse;
import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
import org.apache.http.entity.ContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
-public class RestSink extends StreamPipesDataSink {
+public class RestSink implements IStreamPipesDataSink {
private static final Logger LOG = LoggerFactory.getLogger(RestSink.class);
- private static final String URL_KEY = "url-key";
-
+ public static final String ID =
"org.apache.streampipes.sinks.brokers.jvm.rest";
+ public static final String URL_KEY = "url-key";
+ public static final String HEADER_COLLECTION = "header-collection";
+ public static final String HEADER_KEY = "header-key";
+ public static final String HEADER_VALUE = "header-value";
+ public static final String IS_RETRY_ENABLED_KEY = "is-retry-enabled-key";
+ public static final String RETRY_DELAY_MS_KEY = "retry-delay-ms-key";
+ public static final String RETRY_MAX_RETRIES_KEY = "retry-max-retries-key";
private String url;
private JsonDataFormatDefinition jsonDataFormatDefinition;
+ private List<RestHeaderConfiguration> headerConfigurations = new
ArrayList<>();
+ private boolean isRetryEnabled;
+ private int maxRetries;
+ private int retryDelayMs;
@Override
- public DataSinkDescription declareModel() {
- return
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.rest", 0)
- .category(DataSinkType.FORWARD)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .requiredTextParameter(Labels.withId(URL_KEY),
- false, false)
- .build();
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ RestSink::new,
+ DataSinkBuilder.create(ID, 1)
+ .category(DataSinkType.FORWARD)
+ .withAssets(ExtensionAssetType.DOCUMENTATION)
+ .withLocales(Locales.EN)
+ .requiredTextParameter(
+ Labels.withId(URL_KEY),
+ false,
+ false
+ )
+ .requiredSlideToggle(
+ Labels.withId(IS_RETRY_ENABLED_KEY),
+ false
+ )
+ .requiredStaticProperty(StaticProperties.integerFreeTextProperty(
+ Labels.withId(RETRY_DELAY_MS_KEY),
+ 100
+ ))
+ .requiredStaticProperty(StaticProperties.integerFreeTextProperty(
+ Labels.withId(RETRY_MAX_RETRIES_KEY),
+ 3
+ ))
+ .requiredStaticProperty(
+ StaticProperties.collection(
+ Labels.withId(HEADER_COLLECTION),
+ false,
+
StaticProperties.stringFreeTextProperty(Labels.withId(HEADER_KEY)),
+
StaticProperties.stringFreeTextProperty(Labels.withId(HEADER_VALUE))
+ )
+ )
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .build()
+ );
}
- @Override
- public void onDetach() throws SpRuntimeException {
+ private List<RestHeaderConfiguration>
getHeaderConfigurations(IDataSinkParameterExtractor extractor) {
+ List<RestHeaderConfiguration> headers = new ArrayList<>();
+ var csp = (CollectionStaticProperty)
extractor.getStaticPropertyByName(HEADER_COLLECTION);
+
+ for (StaticProperty member : csp.getMembers()) {
+ var memberExtractor = getMemberExtractor(member);
+
+ var headerConfiguration = getHeaders(memberExtractor);
+ headers.add(headerConfiguration);
+ }
+ return headers;
+ }
+
+ private StaticPropertyExtractor getMemberExtractor(StaticProperty member) {
+ return StaticPropertyExtractor.from(
+ ((StaticPropertyGroup) member).getStaticProperties(),
+ new ArrayList<>()
+ );
+ }
+
+ private RestHeaderConfiguration getHeaders(StaticPropertyExtractor
memberExtractor) {
+ var headerKey = memberExtractor.textParameter(HEADER_KEY);
+ var headerValue = memberExtractor.textParameter(HEADER_VALUE);
+
+ return new RestHeaderConfiguration(headerKey, headerValue);
}
@Override
- public void onInvocation(SinkParams parameters,
- EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(IDataSinkParameters parameters,
+ EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
jsonDataFormatDefinition = new JsonDataFormatDefinition();
url = parameters.extractor().singleValueParameter(URL_KEY, String.class);
+ headerConfigurations = getHeaderConfigurations(parameters.extractor());
+ isRetryEnabled =
parameters.extractor().slideToggleValue(IS_RETRY_ENABLED_KEY);
+ maxRetries = isRetryEnabled ?
parameters.extractor().singleValueParameter(RETRY_DELAY_MS_KEY, Integer.class)
: 0;
+ retryDelayMs = isRetryEnabled ?
parameters.extractor().singleValueParameter(RETRY_DELAY_MS_KEY, Integer.class)
: 0;
}
@Override
- public void onEvent(Event inputEvent) throws SpRuntimeException {
- byte[] json = null;
+ public void onEvent(Event event) {
try {
- json = jsonDataFormatDefinition.fromMap(inputEvent.getRaw());
+ // Set maximum attempts to 1 if not enabled
+ int maxAttempts = 1;
+ if (isRetryEnabled) {
+ maxAttempts = maxRetries;
+ }
+
+ byte[] json = jsonDataFormatDefinition.fromMap(event.getRaw());
+ for (int attempt = 0; attempt < maxAttempts; attempt++) {
+ try {
+ Request request = Request.Post(url)
+ .bodyByteArray(json, ContentType.APPLICATION_JSON)
+ .connectTimeout(1000)
+ .socketTimeout(100000);
+
+ for (RestHeaderConfiguration header : headerConfigurations) {
+ request.addHeader(header.headerKey(), header.headerValue());
+ }
+ Response response = request.execute();
+ HttpResponse httpResponse = response.returnResponse();
+ int statusCode = httpResponse.getStatusLine().getStatusCode();
+ if (statusCode >= 200 && statusCode < 300) {
+ LOG.info("Successfully sent event to {} with status {}", url,
statusCode);
+ return;
+ } else if (statusCode >= 500) {
+ LOG.warn("Server error {} from {}, retrying... (attempt {}/{})",
statusCode, url, attempt + 1, maxRetries);
+ } else {
+ LOG.error("Received status {} from {}, not retrying", statusCode,
url);
+ return;
+ }
+ } catch (IOException e) {
+ LOG.warn("IO error when sending to {}, retrying... (attempt {}/{}):
{}", url, attempt + 1, maxRetries,
+ e.getMessage());
+ }
+ if (attempt < maxRetries - 1) {
+ try {
+ Thread.sleep(retryDelayMs);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted while waiting to retry");
+ return;
+ }
+ } else {
+ LOG.error("Failed to send event to {} after {} attempts", url,
maxRetries);
+ }
+ }
} catch (SpRuntimeException e) {
- LOG.error("Error while serializing event: " +
inputEvent.getSourceInfo().getSourceId() + " Exception: "
- + e);
+ LOG.error("Error while serializing event: {} Exception: {}",
event.getSourceInfo().getSourceId(), e);
}
+ }
- try {
- Request.Post(url)
- .bodyByteArray(json, ContentType.APPLICATION_JSON)
- .connectTimeout(1000)
- .socketTimeout(100000)
- .execute().returnContent().asString();
- } catch (IOException e) {
- LOG.error("Error while sending data to endpoint: " + url + " Exception:
" + e);
- }
+ @Override
+ public void onPipelineStopped() throws SpRuntimeException {
+ this.headerConfigurations = null;
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.rest/strings.en
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.rest/strings.en
index 6bd89fddf7..37e30cf036 100644
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.rest/strings.en
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.rest/strings.en
@@ -20,3 +20,21 @@
org.apache.streampipes.sinks.brokers.jvm.rest.description=Posts events to a REST
url-key.title=REST URL
url-key.description=URL of the REST endpoint
+
+header-collection.title=Request Headers
+header-collection.description=Optional custom headers to be included with the
REST request.
+
+header-key.title=Header
+header-key.description=Optional custom headers to be included with the REST
request.
+
+header-value.title=Header Value
+header-value.description=Optional custom headers to be included with the REST
request.
+
+is-retry-enabled-key.title=Enable Retry
+is-retry-enabled-key.description=If enabled, the request will be retried at an
interval defined by the retry delay up to max retries.
+
+retry-delay-ms-key.title=Retry Delay (ms)
+retry-delay-ms-key.description=Duration in ms to wait for request to retry.
+
+retry-max-retries-key.title=Max Retries
+retry-max-retries-key.description=The maximum number of retries allowed for
request to retry.
\ No newline at end of file