This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch enhance-rest-sink
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit a31c43e3b69e946cfae08fc60eca0d485e64b020
Author: Dominik Riemer <[email protected]>
AuthorDate: Tue Apr 22 13:16:59 2025 +0200

    Fix checkstyle issues
---
 .../jvm/BrokerSinksExtensionModuleExport.java      |   5 +-
 .../jvm/migrations/RestSinkMigrationV1.java        | 197 +++++++------
 .../brokers/jvm/rest/RestHeaderConfiguration.java  |   6 +-
 .../sinks/brokers/jvm/rest/RestSink.java           | 327 +++++++++++----------
 4 files changed, 275 insertions(+), 260 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokerSinksExtensionModuleExport.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokerSinksExtensionModuleExport.java
index 86fe680f7c..df4fce7bf2 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokerSinksExtensionModuleExport.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokerSinksExtensionModuleExport.java
@@ -24,6 +24,7 @@ import 
org.apache.streampipes.extensions.api.migration.IModelMigrator;
 import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
 import 
org.apache.streampipes.sinks.brokers.jvm.bufferrest.BufferRestPublisherSink;
 import org.apache.streampipes.sinks.brokers.jvm.jms.JmsPublisherSink;
+import org.apache.streampipes.sinks.brokers.jvm.migrations.RestSinkMigrationV1;
 import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqPublisherSink;
 import org.apache.streampipes.sinks.brokers.jvm.rest.RestSink;
 import org.apache.streampipes.sinks.brokers.jvm.websocket.WebsocketServerSink;
@@ -50,6 +51,8 @@ public class BrokerSinksExtensionModuleExport implements 
IExtensionModuleExport
 
   @Override
   public List<IModelMigrator<?, ?>> migrators() {
-    return Collections.emptyList();
+    return List.of(
+        new RestSinkMigrationV1()
+    );
   }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/migrations/RestSinkMigrationV1.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/migrations/RestSinkMigrationV1.java
index cfef1440e9..3a3bd4a140 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/migrations/RestSinkMigrationV1.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/migrations/RestSinkMigrationV1.java
@@ -18,8 +18,6 @@
 
 package org.apache.streampipes.sinks.brokers.jvm.migrations;
 
-import java.util.List;
-
 import 
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
 import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
@@ -33,96 +31,109 @@ import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sinks.brokers.jvm.rest.RestSink;
 
+import java.util.List;
+
 public class RestSinkMigrationV1 implements IDataSinkMigrator {
 
-       @Override
-       public ModelMigratorConfig config() {
-               return new ModelMigratorConfig(
-                               "org.apache.streampipes.sinks.brokers.jvm.rest",
-                               SpServiceTagPrefix.DATA_SINK,
-                               0,
-                               1
-               );
-       }
-
-       @Override
-       public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation 
element,
-                                                                               
                                                                                
                                                 IDataSinkParameterExtractor 
extractor) throws RuntimeException {
-               // Extract the URL value from the old configuration
-               String urlValue = 
extractor.singleValueParameter(RestSink.URL_KEY, String.class);
-               if (urlValue == null) {
-                       return MigrationResult.failure(element, "URL property 
not found in old configuration");
-               }
-
-               addRetryDelayKey(element);
-               addIsRetryEnabledKey(element);
-               addRetryMaxRetriesKey(element);
-               addHeaderCollectionKeys(element);
-
-               return MigrationResult.success(element);
-       }
-
-       public void addRetryDelayKey(DataSinkInvocation element) {
-               var label = Labels.from(RestSink.RETRY_DELAY_MS_KEY, "Retry 
Delay (ms)", "Duration in ms to wait for request to " +
-        "retry.");
-               var staticProperty = new RuntimeResolvableAnyStaticProperty(
-                               label.getInternalId(),
-                               label.getLabel(),
-                               label.getDescription()
-               );
-
-               element.getStaticProperties().add(staticProperty);
-       }
-
-       public void addIsRetryEnabledKey(DataSinkInvocation element) {
-               var label = Labels.from(RestSink.IS_RETRY_ENABLED_KEY, "Enable 
Retry",
-                               "If enabled, the request will be retried at an 
interval defined by the retry delay up to max retries.");
-               var staticProperty = new SlideToggleStaticProperty(
-                               label.getInternalId(),
-                               label.getLabel(),
-                               label.getDescription(),
-                               false
-               );
-               element.getStaticProperties().add(staticProperty);
-       }
-
-       public void addRetryMaxRetriesKey(DataSinkInvocation element) {
-               var label = Labels.from(RestSink.RETRY_MAX_RETRIES_KEY, "Max 
Retries",
-                               "The maximum number of retries allowed for 
request to retry.");
-               var staticProperty = new RuntimeResolvableAnyStaticProperty(
-                               label.getInternalId(),
-                               label.getLabel(),
-                               label.getDescription()
-               );
-               element.getStaticProperties().add(staticProperty);
-       }
-
-       public void addHeaderCollectionKeys(DataSinkInvocation element) {
-               StaticPropertyGroup staticPropertyGroup = new 
StaticPropertyGroup();
-               staticPropertyGroup.setLabel("Request Headers");
-               staticPropertyGroup.setDescription("Request Headers");
-               staticPropertyGroup.setInternalName(RestSink.HEADER_COLLECTION);
-               staticPropertyGroup.setHorizontalRendering(false);
-
-               var headerKeyLabel = StaticProperties.stringFreeTextProperty(
-                               Labels.from(RestSink.HEADER_KEY, "Header", 
"Optional custom headers to be included with the REST request.")
-               );
-               headerKeyLabel.setValue("");
-               headerKeyLabel.setOptional(true);
-
-               var headerValueLabel = StaticProperties.stringFreeTextProperty(
-                               Labels.from(RestSink.HEADER_VALUE, "Header 
Value", "Optional custom headers to be included with the REST " +
-            "request.")
-               );
-               headerValueLabel.setValue("");
-               headerValueLabel.setOptional(true);
-
-               staticPropertyGroup.getStaticProperties().addAll(
-                               List.of(
-                                               headerKeyLabel,
-                                               headerValueLabel
-                               )
-               );
-               element.getStaticProperties().add(staticPropertyGroup);
-       }
-}
\ No newline at end of file
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        "org.apache.streampipes.sinks.brokers.jvm.rest",
+        SpServiceTagPrefix.DATA_SINK,
+        0,
+        1
+    );
+  }
+
+  @Override
+  public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation 
element,
+                                                     
IDataSinkParameterExtractor extractor) throws RuntimeException {
+    // Extract the URL value from the old configuration
+    String urlValue = extractor.singleValueParameter(RestSink.URL_KEY, 
String.class);
+    if (urlValue == null) {
+      return MigrationResult.failure(element, "URL property not found in old 
configuration");
+    }
+
+    addRetryDelayKey(element);
+    addIsRetryEnabledKey(element);
+    addRetryMaxRetriesKey(element);
+    addHeaderCollectionKeys(element);
+
+    return MigrationResult.success(element);
+  }
+
+  public void addRetryDelayKey(DataSinkInvocation element) {
+    var label = Labels.from(
+        RestSink.RETRY_DELAY_MS_KEY,
+        "Retry Delay (ms)", "Duration in ms to wait for request to "
+            + "retry."
+    );
+    var staticProperty = new RuntimeResolvableAnyStaticProperty(
+        label.getInternalId(),
+        label.getLabel(),
+        label.getDescription()
+    );
+
+    element.getStaticProperties().add(staticProperty);
+  }
+
+  public void addIsRetryEnabledKey(DataSinkInvocation element) {
+    var label = Labels.from(RestSink.IS_RETRY_ENABLED_KEY, "Enable Retry",
+        "If enabled, the request will be retried at an interval defined by the 
retry delay up to max retries.");
+    var staticProperty = new SlideToggleStaticProperty(
+        label.getInternalId(),
+        label.getLabel(),
+        label.getDescription(),
+        false
+    );
+    element.getStaticProperties().add(staticProperty);
+  }
+
+  public void addRetryMaxRetriesKey(DataSinkInvocation element) {
+    var label = Labels.from(RestSink.RETRY_MAX_RETRIES_KEY, "Max Retries",
+        "The maximum number of retries allowed for request to retry.");
+    var staticProperty = new RuntimeResolvableAnyStaticProperty(
+        label.getInternalId(),
+        label.getLabel(),
+        label.getDescription()
+    );
+    element.getStaticProperties().add(staticProperty);
+  }
+
+  public void addHeaderCollectionKeys(DataSinkInvocation element) {
+    StaticPropertyGroup staticPropertyGroup = new StaticPropertyGroup();
+    staticPropertyGroup.setLabel("Request Headers");
+    staticPropertyGroup.setDescription("Request Headers");
+    staticPropertyGroup.setInternalName(RestSink.HEADER_COLLECTION);
+    staticPropertyGroup.setHorizontalRendering(false);
+
+    var headerKeyLabel = StaticProperties.stringFreeTextProperty(
+        Labels.from(
+            RestSink.HEADER_KEY,
+            "Header",
+            "Optional custom headers to be included with the REST request."
+        )
+    );
+    headerKeyLabel.setValue("");
+    headerKeyLabel.setOptional(true);
+
+    var headerValueLabel = StaticProperties.stringFreeTextProperty(
+        Labels.from(
+            RestSink.HEADER_VALUE,
+            "Header Value",
+            "Optional custom headers to be included with the REST "
+                + "request."
+        )
+    );
+    headerValueLabel.setValue("");
+    headerValueLabel.setOptional(true);
+
+    staticPropertyGroup.getStaticProperties().addAll(
+        List.of(
+            headerKeyLabel,
+            headerValueLabel
+        )
+    );
+    element.getStaticProperties().add(staticPropertyGroup);
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestHeaderConfiguration.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestHeaderConfiguration.java
index b27fec2e83..d3b94893ce 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestHeaderConfiguration.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestHeaderConfiguration.java
@@ -19,7 +19,7 @@
 package org.apache.streampipes.sinks.brokers.jvm.rest;
 
 public record RestHeaderConfiguration(
-               String headerKey,
-               String headerValue
+    String headerKey,
+    String headerValue
 ) {
-}
\ No newline at end of file
+}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
index 02ecda2b97..e1f96eea8f 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java
@@ -18,14 +18,6 @@
 
 package org.apache.streampipes.sinks.brokers.jvm.rest;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.http.HttpResponse;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.client.fluent.Response;
-import org.apache.http.entity.ContentType;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
 import 
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
@@ -47,163 +39,172 @@ import 
org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.entity.ContentType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 public class RestSink implements IStreamPipesDataSink {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(RestSink.class);
-
-       public static final String ID = 
"org.apache.streampipes.sinks.brokers.jvm.rest";
-       public static final String URL_KEY = "url-key";
-       public static final String HEADER_COLLECTION = "header-collection";
-       public static final String HEADER_KEY = "header-key";
-       public static final String HEADER_VALUE = "header-value";
-       public static final String IS_RETRY_ENABLED_KEY = 
"is-retry-enabled-key";
-       public static final String RETRY_DELAY_MS_KEY = "retry-delay-ms-key";
-       public static final String RETRY_MAX_RETRIES_KEY = 
"retry-max-retries-key";
-       private String url;
-       private JsonDataFormatDefinition jsonDataFormatDefinition;
-       private List<RestHeaderConfiguration> headerConfigurations = new 
ArrayList<>();
-       private boolean isRetryEnabled;
-       private int maxRetries;
-       private int retryDelayMs;
-
-       @Override
-       public IDataSinkConfiguration declareConfig() {
-               return DataSinkConfiguration.create(
-                               RestSink::new,
-                               DataSinkBuilder.create(ID, 1)
-                                               .category(DataSinkType.FORWARD)
-                                               
.withAssets(ExtensionAssetType.DOCUMENTATION)
-                                               .withLocales(Locales.EN)
-                                               .requiredTextParameter(
-                                                               
Labels.withId(URL_KEY),
-                                                               false,
-                                                               false
-                                               )
-                                               .requiredSlideToggle(
-                                                               
Labels.withId(IS_RETRY_ENABLED_KEY),
-                                                               false
-                                               )
-                                               
.requiredStaticProperty(StaticProperties.integerFreeTextProperty(
-                                                               
Labels.withId(RETRY_DELAY_MS_KEY),
-                                                               100
-                                               ))
-                                               
.requiredStaticProperty(StaticProperties.integerFreeTextProperty(
-                                                               
Labels.withId(RETRY_MAX_RETRIES_KEY),
-                                                               3
-                                               ))
-                                               .requiredStaticProperty(
-                                                               
StaticProperties.collection(
-                                                                               
Labels.withId(HEADER_COLLECTION),
-                                                                               
false,
-                                                                               
StaticProperties.stringFreeTextProperty(Labels.withId(HEADER_KEY)),
-                                                                               
StaticProperties.stringFreeTextProperty(Labels.withId(HEADER_VALUE))
-                                                               )
-                                               )
-                                               
.requiredStream(StreamRequirementsBuilder
-                                                               .create()
-                                                               
.requiredProperty(EpRequirements.anyProperty())
-                                                               .build())
-                                               .build()
-               );
-       }
-
-       private List<RestHeaderConfiguration> 
getHeaderConfigurations(IDataSinkParameterExtractor extractor) {
-               List<RestHeaderConfiguration> headers = new ArrayList<>();
-               var csp = (CollectionStaticProperty) 
extractor.getStaticPropertyByName(HEADER_COLLECTION);
-
-               for (StaticProperty member : csp.getMembers()) {
-                       var memberExtractor = getMemberExtractor(member);
-
-                       var headerConfiguration = getHeaders(memberExtractor);
-
-                       headers.add(headerConfiguration);
-               }
-               return headers;
-       }
-
-       private StaticPropertyExtractor getMemberExtractor(StaticProperty 
member) {
-               return StaticPropertyExtractor.from(
-                               ((StaticPropertyGroup) 
member).getStaticProperties(),
-                               new ArrayList<>()
-               );
-       }
-
-       private RestHeaderConfiguration getHeaders(StaticPropertyExtractor 
memberExtractor) {
-               var headerKey = memberExtractor.textParameter(HEADER_KEY);
-               var headerValue = memberExtractor.textParameter(HEADER_VALUE);
-
-               return new RestHeaderConfiguration(headerKey, headerValue);
-       }
-
-       @Override
-       public void onPipelineStarted(IDataSinkParameters parameters,
-                                     EventSinkRuntimeContext runtimeContext) 
throws SpRuntimeException {
-               jsonDataFormatDefinition = new JsonDataFormatDefinition();
-               url = parameters.extractor().singleValueParameter(URL_KEY, 
String.class);
-               headerConfigurations = 
getHeaderConfigurations(parameters.extractor());
-               isRetryEnabled = 
parameters.extractor().slideToggleValue(IS_RETRY_ENABLED_KEY);
-               maxRetries = isRetryEnabled ? 
parameters.extractor().singleValueParameter(RETRY_DELAY_MS_KEY, Integer.class) 
: 0;
-               retryDelayMs = isRetryEnabled ? 
parameters.extractor().singleValueParameter(RETRY_DELAY_MS_KEY, Integer.class) 
: 0;
-       }
-
-       @Override
-       public void onEvent(Event event) {
-               try {
-                       // Set maximum attempts to 1 if not enabled
-                       int maxAttempts = 1;
-                       if (isRetryEnabled) {
-                               maxAttempts = maxRetries;
-                       }
-
-                       byte[] json = 
jsonDataFormatDefinition.fromMap(event.getRaw());
-                       for (int attempt = 0; attempt < maxAttempts; attempt++) 
{
-                               try {
-                                       Request request = Request.Post(url)
-                                                       .bodyByteArray(json, 
ContentType.APPLICATION_JSON)
-                                                       .connectTimeout(1000)
-                                                       .socketTimeout(100000);
-
-                                       for (RestHeaderConfiguration header : 
headerConfigurations) {
-                                               
request.addHeader(header.headerKey(), header.headerValue());
-                                       }
-                                       Response response = request.execute();
-                                       HttpResponse httpResponse = 
response.returnResponse();
-                                       int statusCode = 
httpResponse.getStatusLine().getStatusCode();
-                                       if (statusCode >= 200 && statusCode < 
300) {
-                                               LOG.info("Successfully sent 
event to {} with status {}", url, statusCode);
-                                               return;
-                                       } else if (statusCode >= 500) {
-                                               LOG.warn("Server error {} from 
{}, retrying... (attempt {}/{})", statusCode, url, attempt + 1, maxRetries);
-                                       } else {
-                                               LOG.error("Received status {} 
from {}, not retrying", statusCode, url);
-                                               return;
-                                       }
-                               } catch (IOException e) {
-                                       LOG.warn("IO error when sending to {}, 
retrying... (attempt {}/{}): {}", url, attempt + 1, maxRetries,
-                                                       e.getMessage());
-                               }
-                               if (attempt < maxRetries - 1) {
-                                       try {
-                                               Thread.sleep(retryDelayMs);
-                                       } catch (InterruptedException ie) {
-                                               
Thread.currentThread().interrupt();
-                                               LOG.error("Interrupted while 
waiting to retry");
-                                               return;
-                                       }
-                               } else {
-                                       LOG.error("Failed to send event to {} 
after {} attempts", url, maxRetries);
-                               }
-                       }
-               } catch (SpRuntimeException e) {
-                       LOG.error("Error while serializing event: {} Exception: 
{}", event.getSourceInfo().getSourceId(), e);
-               }
-       }
-
-       @Override
-       public void onPipelineStopped() throws SpRuntimeException {
-               this.headerConfigurations = null;
-       }
-}
\ No newline at end of file
+  private static final Logger LOG = LoggerFactory.getLogger(RestSink.class);
+
+  public static final String ID = 
"org.apache.streampipes.sinks.brokers.jvm.rest";
+  public static final String URL_KEY = "url-key";
+  public static final String HEADER_COLLECTION = "header-collection";
+  public static final String HEADER_KEY = "header-key";
+  public static final String HEADER_VALUE = "header-value";
+  public static final String IS_RETRY_ENABLED_KEY = "is-retry-enabled-key";
+  public static final String RETRY_DELAY_MS_KEY = "retry-delay-ms-key";
+  public static final String RETRY_MAX_RETRIES_KEY = "retry-max-retries-key";
+  private String url;
+  private JsonDataFormatDefinition jsonDataFormatDefinition;
+  private List<RestHeaderConfiguration> headerConfigurations = new 
ArrayList<>();
+  private boolean isRetryEnabled;
+  private int maxRetries;
+  private int retryDelayMs;
+
+  @Override
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(
+        RestSink::new,
+        DataSinkBuilder.create(ID, 1)
+            .category(DataSinkType.FORWARD)
+            .withAssets(ExtensionAssetType.DOCUMENTATION)
+            .withLocales(Locales.EN)
+            .requiredTextParameter(
+                Labels.withId(URL_KEY),
+                false,
+                false
+            )
+            .requiredSlideToggle(
+                Labels.withId(IS_RETRY_ENABLED_KEY),
+                false
+            )
+            .requiredStaticProperty(StaticProperties.integerFreeTextProperty(
+                Labels.withId(RETRY_DELAY_MS_KEY),
+                100
+            ))
+            .requiredStaticProperty(StaticProperties.integerFreeTextProperty(
+                Labels.withId(RETRY_MAX_RETRIES_KEY),
+                3
+            ))
+            .requiredStaticProperty(
+                StaticProperties.collection(
+                    Labels.withId(HEADER_COLLECTION),
+                    false,
+                    
StaticProperties.stringFreeTextProperty(Labels.withId(HEADER_KEY)),
+                    
StaticProperties.stringFreeTextProperty(Labels.withId(HEADER_VALUE))
+                )
+            )
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredProperty(EpRequirements.anyProperty())
+                .build())
+            .build()
+    );
+  }
+
+  private List<RestHeaderConfiguration> 
getHeaderConfigurations(IDataSinkParameterExtractor extractor) {
+    List<RestHeaderConfiguration> headers = new ArrayList<>();
+    var csp = (CollectionStaticProperty) 
extractor.getStaticPropertyByName(HEADER_COLLECTION);
+
+    for (StaticProperty member : csp.getMembers()) {
+      var memberExtractor = getMemberExtractor(member);
+
+      var headerConfiguration = getHeaders(memberExtractor);
+
+      headers.add(headerConfiguration);
+    }
+    return headers;
+  }
+
+  private StaticPropertyExtractor getMemberExtractor(StaticProperty member) {
+    return StaticPropertyExtractor.from(
+        ((StaticPropertyGroup) member).getStaticProperties(),
+        new ArrayList<>()
+    );
+  }
+
+  private RestHeaderConfiguration getHeaders(StaticPropertyExtractor 
memberExtractor) {
+    var headerKey = memberExtractor.textParameter(HEADER_KEY);
+    var headerValue = memberExtractor.textParameter(HEADER_VALUE);
+
+    return new RestHeaderConfiguration(headerKey, headerValue);
+  }
+
+  @Override
+  public void onPipelineStarted(IDataSinkParameters parameters,
+                                EventSinkRuntimeContext runtimeContext) throws 
SpRuntimeException {
+    jsonDataFormatDefinition = new JsonDataFormatDefinition();
+    url = parameters.extractor().singleValueParameter(URL_KEY, String.class);
+    headerConfigurations = getHeaderConfigurations(parameters.extractor());
+    isRetryEnabled = 
parameters.extractor().slideToggleValue(IS_RETRY_ENABLED_KEY);
+    maxRetries = isRetryEnabled ? 
parameters.extractor().singleValueParameter(RETRY_DELAY_MS_KEY, Integer.class) 
: 0;
+    retryDelayMs = isRetryEnabled ? 
parameters.extractor().singleValueParameter(RETRY_DELAY_MS_KEY, Integer.class) 
: 0;
+  }
+
+  @Override
+  public void onEvent(Event event) {
+    try {
+      // Set maximum attempts to 1 if not enabled
+      int maxAttempts = 1;
+      if (isRetryEnabled) {
+        maxAttempts = maxRetries;
+      }
+
+      byte[] json = jsonDataFormatDefinition.fromMap(event.getRaw());
+      for (int attempt = 0; attempt < maxAttempts; attempt++) {
+        try {
+          Request request = Request.Post(url)
+              .bodyByteArray(json, ContentType.APPLICATION_JSON)
+              .connectTimeout(1000)
+              .socketTimeout(100000);
+
+          for (RestHeaderConfiguration header : headerConfigurations) {
+            request.addHeader(header.headerKey(), header.headerValue());
+          }
+          Response response = request.execute();
+          HttpResponse httpResponse = response.returnResponse();
+          int statusCode = httpResponse.getStatusLine().getStatusCode();
+          if (statusCode >= 200 && statusCode < 300) {
+            LOG.info("Successfully sent event to {} with status {}", url, 
statusCode);
+            return;
+          } else if (statusCode >= 500) {
+            LOG.warn("Server error {} from {}, retrying... (attempt {}/{})", 
statusCode, url, attempt + 1, maxRetries);
+          } else {
+            LOG.error("Received status {} from {}, not retrying", statusCode, 
url);
+            return;
+          }
+        } catch (IOException e) {
+          LOG.warn("IO error when sending to {}, retrying... (attempt {}/{}): 
{}", url, attempt + 1, maxRetries,
+              e.getMessage());
+        }
+        if (attempt < maxRetries - 1) {
+          try {
+            Thread.sleep(retryDelayMs);
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            LOG.error("Interrupted while waiting to retry");
+            return;
+          }
+        } else {
+          LOG.error("Failed to send event to {} after {} attempts", url, 
maxRetries);
+        }
+      }
+    } catch (SpRuntimeException e) {
+      LOG.error("Error while serializing event: {} Exception: {}", 
event.getSourceInfo().getSourceId(), e);
+    }
+  }
+
+  @Override
+  public void onPipelineStopped() throws SpRuntimeException {
+    this.headerConfigurations = null;
+  }
+}

Reply via email to