This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new d2afd28cc feat(connectors): add retry mechanism to connectors HTTP 
config provider (#2437)
d2afd28cc is described below

commit d2afd28ccc5c20bb0af443adc03cb0a559ff9b38
Author: tungtose <[email protected]>
AuthorDate: Wed Dec 3 20:31:40 2025 +0700

    feat(connectors): add retry mechanism to connectors HTTP config provider 
(#2437)
    
    This PR resolves #2416
---
 Cargo.lock                                         |  2 +
 core/connectors/runtime/Cargo.toml                 |  2 +
 core/connectors/runtime/README.md                  |  7 ++++
 core/connectors/runtime/src/configs/connectors.rs  |  1 +
 .../src/configs/connectors/http_provider.rs        | 31 +++++++++++++--
 core/connectors/runtime/src/configs/runtime.rs     | 46 +++++++++++++++++++++-
 6 files changed, 84 insertions(+), 5 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 59712c563..d6ee2f530 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4717,6 +4717,8 @@ dependencies = [
  "once_cell",
  "postcard",
  "reqwest",
+ "reqwest-middleware",
+ "reqwest-retry",
  "serde",
  "serde_json",
  "serde_with",
diff --git a/core/connectors/runtime/Cargo.toml 
b/core/connectors/runtime/Cargo.toml
index 5dd541344..317314ed9 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -47,6 +47,8 @@ mimalloc = { workspace = true }
 once_cell = { workspace = true }
 postcard = { workspace = true }
 reqwest = { workspace = true }
+reqwest-middleware = { workspace = true }
+reqwest-retry = { workspace = true }
 serde = { workspace = true }
 serde_json = { workspace = true }
 serde_with = { workspace = true }
diff --git a/core/connectors/runtime/README.md 
b/core/connectors/runtime/README.md
index 5dc83c94f..ac77dc287 100644
--- a/core/connectors/runtime/README.md
+++ b/core/connectors/runtime/README.md
@@ -65,6 +65,13 @@ timeout = "10s"
 [connectors.request_headers]
 api-key = "your-api-key"
 
+[connectors.retry]
+enabled = true
+max_attempts = 3
+initial_backoff = "1 s"
+max_backoff = "30 s"
+backoff_multiplier = 2
+
 [connectors.url_templates]
 # Optional: Customize URL templates for specific operations
 # If not specified, default RESTful URL patterns are used
diff --git a/core/connectors/runtime/src/configs/connectors.rs 
b/core/connectors/runtime/src/configs/connectors.rs
index 67af54dc9..6bacdc7e4 100644
--- a/core/connectors/runtime/src/configs/connectors.rs
+++ b/core/connectors/runtime/src/configs/connectors.rs
@@ -242,6 +242,7 @@ pub async fn create_connectors_config_provider(
                 &config.request_headers,
                 &config.url_templates,
                 &config.response,
+                &config.retry,
             )?;
             Ok(Box::new(provider))
         }
diff --git a/core/connectors/runtime/src/configs/connectors/http_provider.rs 
b/core/connectors/runtime/src/configs/connectors/http_provider.rs
index 41618289d..1df41225e 100644
--- a/core/connectors/runtime/src/configs/connectors/http_provider.rs
+++ b/core/connectors/runtime/src/configs/connectors/http_provider.rs
@@ -26,10 +26,12 @@ use crate::configs::connectors::{
     ConnectorConfigVersions, ConnectorsConfig, ConnectorsConfigProvider, 
CreateSinkConfig,
     CreateSourceConfig, SinkConfig, SourceConfig,
 };
-use crate::configs::runtime::ResponseConfig;
+use crate::configs::runtime::{ResponseConfig, RetryConfig};
 use crate::error::RuntimeError;
 use async_trait::async_trait;
 use reqwest;
+use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
+use reqwest_retry::{Jitter, RetryTransientMiddleware, 
policies::ExponentialBackoff};
 use serde::{Deserialize, Serialize};
 use std::collections::HashMap;
 use std::str::FromStr;
@@ -43,7 +45,7 @@ struct SetActiveVersionRequest {
 pub struct HttpConnectorsConfigProvider {
     url_builder: UrlBuilder,
     response_extractor: ResponseExtractor,
-    client: reqwest::Client,
+    client: ClientWithMiddleware,
 }
 
 impl HttpConnectorsConfigProvider {
@@ -53,6 +55,7 @@ impl HttpConnectorsConfigProvider {
         request_headers: &HashMap<String, String>,
         url_templates: &HashMap<String, String>,
         response_config: &ResponseConfig,
+        retry_config: &RetryConfig,
     ) -> Result<Self, RuntimeError> {
         let mut headers = reqwest::header::HeaderMap::new();
         for (key, value) in request_headers {
@@ -75,13 +78,35 @@ impl HttpConnectorsConfigProvider {
                 RuntimeError::InvalidConfiguration(format!("Failed to build 
HTTP client: {err}"))
             })?;
 
+        let mut client_with_middleware = ClientBuilder::new(client);
+
+        if retry_config.enabled {
+            tracing::trace!("Apply retry config: {:?}", retry_config);
+
+            let retry_policy = ExponentialBackoff::builder()
+                .retry_bounds(
+                    retry_config.initial_backoff.get_duration(),
+                    retry_config.max_backoff.get_duration(),
+                )
+                .base(retry_config.backoff_multiplier)
+                .jitter(Jitter::Bounded)
+                .build_with_max_retries(retry_config.max_attempts);
+
+            let retry_transient_middleware =
+                RetryTransientMiddleware::new_with_policy(retry_policy);
+
+            client_with_middleware = 
client_with_middleware.with(retry_transient_middleware);
+        }
+
+        let final_client = client_with_middleware.build();
+
         let url_builder = UrlBuilder::new(base_url, url_templates);
         let response_extractor = ResponseExtractor::new(response_config);
 
         Ok(Self {
             url_builder,
             response_extractor,
-            client,
+            client: final_client,
         })
     }
 
diff --git a/core/connectors/runtime/src/configs/runtime.rs 
b/core/connectors/runtime/src/configs/runtime.rs
index 720204f07..f9ef28c90 100644
--- a/core/connectors/runtime/src/configs/runtime.rs
+++ b/core/connectors/runtime/src/configs/runtime.rs
@@ -53,6 +53,44 @@ pub struct IggyTlsConfig {
     pub domain: Option<String>,
 }
 
+#[serde_as]
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct RetryConfig {
+    pub enabled: bool,
+    pub max_attempts: u32,
+    #[serde_as(as = "DisplayFromStr")]
+    pub initial_backoff: IggyDuration,
+    #[serde_as(as = "DisplayFromStr")]
+    pub max_backoff: IggyDuration,
+    pub backoff_multiplier: u32,
+}
+
+impl Default for RetryConfig {
+    fn default() -> Self {
+        Self {
+            enabled: true,
+            max_attempts: 3,
+            initial_backoff: IggyDuration::new_from_secs(1),
+            max_backoff: IggyDuration::new_from_secs(30),
+            backoff_multiplier: 2,
+        }
+    }
+}
+
+impl Display for RetryConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{{ enabled: {}, max_attempts: {}, initial_backoff: {}, 
max_backoff: {}, backoff_multiplier: {} }}",
+            self.enabled,
+            self.max_attempts,
+            self.initial_backoff,
+            self.max_backoff,
+            self.backoff_multiplier
+        )
+    }
+}
+
 #[derive(Debug, Default, Clone, Deserialize, Serialize)]
 #[serde(default)]
 pub struct LocalConnectorsConfig {
@@ -72,18 +110,21 @@ pub struct HttpConnectorsConfig {
     pub url_templates: HashMap<String, String>,
     #[serde(default)]
     pub response: ResponseConfig,
+    #[serde(default)]
+    pub retry: RetryConfig,
 }
 
 impl Display for HttpConnectorsConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ type: \"http\", base_url: {:?}, request_headers: {:?}, 
timeout: {}, url_templates: {:?}, response: {:?} }}",
+            "{{ type: \"http\", base_url: {:?}, request_headers: {:?}, 
timeout: {}, url_templates: {:?}, response: {:?}, retry: {} }}",
             self.base_url,
             self.request_headers.keys(),
             self.timeout,
             self.url_templates,
-            self.response
+            self.response,
+            self.retry
         )
     }
 }
@@ -99,6 +140,7 @@ pub struct ResponseConfig {
     pub error_path: Option<String>,
 }
 
+#[allow(clippy::large_enum_variant)]
 #[derive(Debug, Clone, Deserialize, Serialize)]
 #[serde(tag = "config_type", rename_all = "lowercase")]
 pub enum ConnectorsConfig {

Reply via email to