This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 57b94e82de #4113 - Improve error handling (#4115)
57b94e82de is described below
commit 57b94e82dedf52c04ebb5d151ccc14f856d95f03
Author: Stefan Obermeier <[email protected]>
AuthorDate: Mon Jan 26 13:35:43 2026 +0100
#4113 - Improve error handling (#4115)
Co-authored-by: obermeier <[email protected]>
---
.../notifications/jvm/msteams/MSTeamsSink.java | 84 ++++++++++++++++------
.../notifications/jvm/msteams/TestMSTeamsSink.java | 6 +-
2 files changed, 64 insertions(+), 26 deletions(-)
diff --git
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
index 10129f3634..e62cf725e8 100644
---
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
+++
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
@@ -40,18 +40,20 @@ import
org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.Header;
import org.apache.http.HttpHost;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
+import java.time.Duration;
import java.util.Map;
public class MSTeamsSink extends StreamPipesNotificationSink {
@@ -70,12 +72,15 @@ public class MSTeamsSink extends
StreamPipesNotificationSink {
public static final String KEY_PROXY_GROUP = "proxyConfigurationGroup";
public static final String KEY_PROXY_URL = "proxyUrl";
protected static final String SIMPLE_MESSAGE_TEMPLATE = "{\"text\": \"%s\"}";
+ private static final int MAX_RETRIES = 5;
+ private static final int HTTP_TOO_MANY_REQUESTS = 429;
+ private static final Duration BASE_BACKOFF = Duration.ofSeconds(1);
private String messageContent;
private boolean isSimpleMessageMode;
private String webhookUrl;
private ObjectMapper objectMapper;
- private HttpClient httpClient;
+ private CloseableHttpClient httpClient;
public MSTeamsSink() {
super();
@@ -194,8 +199,12 @@ public class MSTeamsSink extends
StreamPipesNotificationSink {
@Override
public void onPipelineStopped() {
- // nothing to do
- }
+ try {
+ this.httpClient.close();
+ } catch (IOException e) {
+ throw new SpRuntimeException("Error when closing MSTeams client:
%s".formatted(e.getMessage()));
+ }
+ }
/**
* Creates a JSON string intended for the MS Teams Webhook URL based on the
provided plain message content.
@@ -240,30 +249,59 @@ public class MSTeamsSink extends
StreamPipesNotificationSink {
/**
* Sends a payload to a webhook using the provided HTTP client, payload, and
webhook URL.
*
- * @param httpClient The HTTP client used to send the payload.
+ * @param mockedClient The HTTP client used to send the payload.
* @param payload The payload to be sent to the webhook.
* @param webhookUrl The URL of the webhook to which the payload will be
sent.
* @throws SpRuntimeException If an I/O error occurs while sending the
payload to the webhook or
* the payload sent is not accepted by the API.
*/
- protected void sendPayloadToWebhook(HttpClient httpClient, String payload,
String webhookUrl) {
- try {
- var contentEntity = new StringEntity(payload);
- contentEntity.setContentType(ContentType.APPLICATION_JSON.toString());
-
- var postRequest = new HttpPost(webhookUrl);
- postRequest.setEntity(contentEntity);
-
- var result = httpClient.execute(postRequest);
- if (result.getStatusLine()
- .getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
- throw new SpRuntimeException(
- "The provided message payload was not accepted by the MS Teams
API: %s"
- .formatted(payload)
- );
+ protected void sendPayloadToWebhook(CloseableHttpClient mockedClient, String
payload, String webhookUrl) {
+
+ for (int attempt = 1; ; attempt++) {
+ HttpPost request = new HttpPost(webhookUrl);
+ request.setEntity(new StringEntity(payload,
ContentType.APPLICATION_JSON));
+
+ if (Thread.currentThread().isInterrupted()) {
+ throw new SpRuntimeException("Interrupted while sending MS Teams
webhook");
+ }
+
+ try (CloseableHttpResponse response = mockedClient.execute(request)) {
+ int status = response.getStatusLine().getStatusCode();
+ if (status >= 200 && status < 300) {
+ return;
+ }
+
+ if (status != HTTP_TOO_MANY_REQUESTS && (status < 500 || status >=
600)) {
+ throw new SpRuntimeException("MS Teams webhook rejected request
(status=%d)".formatted(status));
+ }
+
+ if (attempt > MAX_RETRIES) {
+ throw new SpRuntimeException("MS Teams webhook failed after %d
attempts (status=%d)"
+ .formatted(attempt - 1, status));
+ }
+
+ long backoffMs = BASE_BACKOFF.toMillis() << Math.min(attempt, 6);
+
+ Header retryAfter = response.getFirstHeader("Retry-After");
+ if (retryAfter != null) {
+ try {
+ backoffMs = Long.parseLong(retryAfter.getValue()) * 1000;
+ } catch (NumberFormatException ignored) {}
+ }
+
+ Thread.sleep(backoffMs);
+ } catch (IOException | InterruptedException e) {
+ if (attempt > MAX_RETRIES) {
+ throw new SpRuntimeException("I/O error sending MS Teams webhook",
e);
+ }
+
+ try {
+ Thread.sleep(BASE_BACKOFF.toMillis() << Math.min(attempt, 6));
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new SpRuntimeException("Interrupted while retrying MS Teams
webhook", ie);
+ }
}
- } catch (IOException e) {
- throw new SpRuntimeException("Sending notification to MS Teams failed.",
e);
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/test/java/org/apache/streampipes/sinks/notifications/jvm/msteams/TestMSTeamsSink.java
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/test/java/org/apache/streampipes/sinks/notifications/jvm/msteams/TestMSTeamsSink.java
index 58bdb297f6..3ad8b6b684 100644
---
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/test/java/org/apache/streampipes/sinks/notifications/jvm/msteams/TestMSTeamsSink.java
+++
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/test/java/org/apache/streampipes/sinks/notifications/jvm/msteams/TestMSTeamsSink.java
@@ -22,10 +22,10 @@ import
org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
-import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -72,7 +72,7 @@ public class TestMSTeamsSink {
@Test
public void sendPayloadToWebhook() throws IOException {
- var mockedClient = mock(HttpClient.class);
+ var mockedClient = mock(CloseableHttpClient.class);
var mockedResponse = mock(CloseableHttpResponse.class);
var mockedStatusLine = mock(StatusLine.class);
var argumentCaptor = ArgumentCaptor.forClass(HttpPost.class);
@@ -102,7 +102,7 @@ public class TestMSTeamsSink {
@Test
public void sendPayloadToWebhookBadResponse() throws IOException {
- var mockedClient = mock(HttpClient.class);
+ CloseableHttpClient mockedClient = mock(CloseableHttpClient.class);
var mockedResponse = mock(CloseableHttpResponse.class);
var mockedStatusLine = mock(StatusLine.class);