This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 57b94e82de #4113 - Improve error handling (#4115)
57b94e82de is described below

commit 57b94e82dedf52c04ebb5d151ccc14f856d95f03
Author: Stefan Obermeier <[email protected]>
AuthorDate: Mon Jan 26 13:35:43 2026 +0100

    #4113 - Improve error handling (#4115)
    
    Co-authored-by: obermeier <[email protected]>
---
 .../notifications/jvm/msteams/MSTeamsSink.java     | 84 ++++++++++++++++------
 .../notifications/jvm/msteams/TestMSTeamsSink.java |  6 +-
 2 files changed, 64 insertions(+), 26 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
index 10129f3634..e62cf725e8 100644
--- 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
@@ -40,18 +40,20 @@ import 
org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.Header;
 import org.apache.http.HttpHost;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.client.HttpClients;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.time.Duration;
 import java.util.Map;
 
 public class MSTeamsSink extends StreamPipesNotificationSink {
@@ -70,12 +72,15 @@ public class MSTeamsSink extends 
StreamPipesNotificationSink {
   public static final String KEY_PROXY_GROUP = "proxyConfigurationGroup";
   public static final String KEY_PROXY_URL = "proxyUrl";
   protected static final String SIMPLE_MESSAGE_TEMPLATE = "{\"text\": \"%s\"}";
+  private static final int MAX_RETRIES = 5;
+  private static final int HTTP_TOO_MANY_REQUESTS = 429;
+  private static final Duration BASE_BACKOFF = Duration.ofSeconds(1);
 
   private String messageContent;
   private boolean isSimpleMessageMode;
   private String webhookUrl;
   private ObjectMapper objectMapper;
-  private HttpClient httpClient;
+  private CloseableHttpClient httpClient;
 
   public MSTeamsSink() {
     super();
@@ -194,8 +199,12 @@ public class MSTeamsSink extends 
StreamPipesNotificationSink {
 
   @Override
   public void onPipelineStopped() {
-    // nothing to do
-  }
+    try {
+        this.httpClient.close();
+    } catch (IOException e) {
+        throw new SpRuntimeException("Error when closing MSTeams client: 
%s".formatted(e.getMessage()));
+    }
+   }
 
   /**
    * Creates a JSON string intended for the MS Teams Webhook URL based on the 
provided plain message content.
@@ -240,30 +249,59 @@ public class MSTeamsSink extends 
StreamPipesNotificationSink {
   /**
    * Sends a payload to a webhook using the provided HTTP client, payload, and 
webhook URL.
    *
-   * @param httpClient The HTTP client used to send the payload.
+   * @param mockedClient The HTTP client used to send the payload.
    * @param payload    The payload to be sent to the webhook.
    * @param webhookUrl The URL of the webhook to which the payload will be 
sent.
    * @throws SpRuntimeException If an I/O error occurs while sending the 
payload to the webhook or
    *                            the payload sent is not accepted by the API.
    */
-  protected void sendPayloadToWebhook(HttpClient httpClient, String payload, 
String webhookUrl) {
-    try {
-      var contentEntity = new StringEntity(payload);
-      contentEntity.setContentType(ContentType.APPLICATION_JSON.toString());
-
-      var postRequest = new HttpPost(webhookUrl);
-      postRequest.setEntity(contentEntity);
-
-      var result = httpClient.execute(postRequest);
-      if (result.getStatusLine()
-          .getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
-        throw new SpRuntimeException(
-            "The provided message payload was not accepted by the MS Teams 
API: %s"
-                .formatted(payload)
-        );
+  protected void sendPayloadToWebhook(CloseableHttpClient mockedClient, String 
payload, String webhookUrl) {
+
+    for (int attempt = 1; ; attempt++) {
+      HttpPost request = new HttpPost(webhookUrl);
+      request.setEntity(new StringEntity(payload, 
ContentType.APPLICATION_JSON));
+
+      if (Thread.currentThread().isInterrupted()) {
+        throw new SpRuntimeException("Interrupted while sending MS Teams 
webhook");
+      }
+
+      try (CloseableHttpResponse response = mockedClient.execute(request)) {
+        int status = response.getStatusLine().getStatusCode();
+        if (status >= 200 && status < 300) {
+          return;
+        }
+
+        if (status != HTTP_TOO_MANY_REQUESTS && (status < 500 || status >= 
600)) {
+          throw new SpRuntimeException("MS Teams webhook rejected request 
(status=%d)".formatted(status));
+        }
+
+        if (attempt > MAX_RETRIES) {
+          throw new SpRuntimeException("MS Teams webhook failed after %d 
attempts (status=%d)"
+            .formatted(attempt - 1, status));
+        }
+
+        long backoffMs = BASE_BACKOFF.toMillis() << Math.min(attempt, 6);
+
+        Header retryAfter = response.getFirstHeader("Retry-After");
+        if (retryAfter != null) {
+        try {
+            backoffMs = Long.parseLong(retryAfter.getValue()) * 1000;
+          } catch (NumberFormatException ignored) {}
+        }
+
+        Thread.sleep(backoffMs);
+      } catch (IOException | InterruptedException e) {
+        if (attempt > MAX_RETRIES) {
+          throw new SpRuntimeException("I/O error sending MS Teams webhook", 
e);
+        }
+
+        try {
+          Thread.sleep(BASE_BACKOFF.toMillis() << Math.min(attempt, 6));
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          throw new SpRuntimeException("Interrupted while retrying MS Teams 
webhook", ie);
+        }
       }
-    } catch (IOException e) {
-      throw new SpRuntimeException("Sending notification to MS Teams failed.", 
e);
     }
   }
 
diff --git 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/test/java/org/apache/streampipes/sinks/notifications/jvm/msteams/TestMSTeamsSink.java
 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/test/java/org/apache/streampipes/sinks/notifications/jvm/msteams/TestMSTeamsSink.java
index 58bdb297f6..3ad8b6b684 100644
--- 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/test/java/org/apache/streampipes/sinks/notifications/jvm/msteams/TestMSTeamsSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/test/java/org/apache/streampipes/sinks/notifications/jvm/msteams/TestMSTeamsSink.java
@@ -22,10 +22,10 @@ import 
org.apache.streampipes.commons.exceptions.SpRuntimeException;
 
 import org.apache.http.HttpStatus;
 import org.apache.http.StatusLine;
-import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.util.EntityUtils;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -72,7 +72,7 @@ public class TestMSTeamsSink {
   @Test
   public void sendPayloadToWebhook() throws IOException {
 
-    var mockedClient = mock(HttpClient.class);
+    var mockedClient = mock(CloseableHttpClient.class);
     var mockedResponse = mock(CloseableHttpResponse.class);
     var mockedStatusLine = mock(StatusLine.class);
     var argumentCaptor = ArgumentCaptor.forClass(HttpPost.class);
@@ -102,7 +102,7 @@ public class TestMSTeamsSink {
 
   @Test
   public void sendPayloadToWebhookBadResponse() throws  IOException {
-    var mockedClient = mock(HttpClient.class);
+    CloseableHttpClient mockedClient = mock(CloseableHttpClient.class);
     var mockedResponse = mock(CloseableHttpResponse.class);
     var mockedStatusLine = mock(StatusLine.class);
 

Reply via email to