This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new d2afd28cc feat(connectors): add retry mechanism to connectors HTTP
config provider (#2437)
d2afd28cc is described below
commit d2afd28ccc5c20bb0af443adc03cb0a559ff9b38
Author: tungtose <[email protected]>
AuthorDate: Wed Dec 3 20:31:40 2025 +0700
feat(connectors): add retry mechanism to connectors HTTP config provider
(#2437)
This PR resolves #2416
---
Cargo.lock | 2 +
core/connectors/runtime/Cargo.toml | 2 +
core/connectors/runtime/README.md | 7 ++++
core/connectors/runtime/src/configs/connectors.rs | 1 +
.../src/configs/connectors/http_provider.rs | 31 +++++++++++++--
core/connectors/runtime/src/configs/runtime.rs | 46 +++++++++++++++++++++-
6 files changed, 84 insertions(+), 5 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 59712c563..d6ee2f530 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4717,6 +4717,8 @@ dependencies = [
"once_cell",
"postcard",
"reqwest",
+ "reqwest-middleware",
+ "reqwest-retry",
"serde",
"serde_json",
"serde_with",
diff --git a/core/connectors/runtime/Cargo.toml
b/core/connectors/runtime/Cargo.toml
index 5dd541344..317314ed9 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -47,6 +47,8 @@ mimalloc = { workspace = true }
once_cell = { workspace = true }
postcard = { workspace = true }
reqwest = { workspace = true }
+reqwest-middleware = { workspace = true }
+reqwest-retry = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
diff --git a/core/connectors/runtime/README.md
b/core/connectors/runtime/README.md
index 5dc83c94f..ac77dc287 100644
--- a/core/connectors/runtime/README.md
+++ b/core/connectors/runtime/README.md
@@ -65,6 +65,13 @@ timeout = "10s"
[connectors.request_headers]
api-key = "your-api-key"
+[connectors.retry]
+enabled = true
+max_attempts = 3
+initial_backoff = "1 s"
+max_backoff = "30 s"
+backoff_multiplier = 2
+
[connectors.url_templates]
# Optional: Customize URL templates for specific operations
# If not specified, default RESTful URL patterns are used
diff --git a/core/connectors/runtime/src/configs/connectors.rs
b/core/connectors/runtime/src/configs/connectors.rs
index 67af54dc9..6bacdc7e4 100644
--- a/core/connectors/runtime/src/configs/connectors.rs
+++ b/core/connectors/runtime/src/configs/connectors.rs
@@ -242,6 +242,7 @@ pub async fn create_connectors_config_provider(
&config.request_headers,
&config.url_templates,
&config.response,
+ &config.retry,
)?;
Ok(Box::new(provider))
}
diff --git a/core/connectors/runtime/src/configs/connectors/http_provider.rs
b/core/connectors/runtime/src/configs/connectors/http_provider.rs
index 41618289d..1df41225e 100644
--- a/core/connectors/runtime/src/configs/connectors/http_provider.rs
+++ b/core/connectors/runtime/src/configs/connectors/http_provider.rs
@@ -26,10 +26,12 @@ use crate::configs::connectors::{
ConnectorConfigVersions, ConnectorsConfig, ConnectorsConfigProvider,
CreateSinkConfig,
CreateSourceConfig, SinkConfig, SourceConfig,
};
-use crate::configs::runtime::ResponseConfig;
+use crate::configs::runtime::{ResponseConfig, RetryConfig};
use crate::error::RuntimeError;
use async_trait::async_trait;
use reqwest;
+use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
+use reqwest_retry::{Jitter, RetryTransientMiddleware,
policies::ExponentialBackoff};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
@@ -43,7 +45,7 @@ struct SetActiveVersionRequest {
pub struct HttpConnectorsConfigProvider {
url_builder: UrlBuilder,
response_extractor: ResponseExtractor,
- client: reqwest::Client,
+ client: ClientWithMiddleware,
}
impl HttpConnectorsConfigProvider {
@@ -53,6 +55,7 @@ impl HttpConnectorsConfigProvider {
request_headers: &HashMap<String, String>,
url_templates: &HashMap<String, String>,
response_config: &ResponseConfig,
+ retry_config: &RetryConfig,
) -> Result<Self, RuntimeError> {
let mut headers = reqwest::header::HeaderMap::new();
for (key, value) in request_headers {
@@ -75,13 +78,35 @@ impl HttpConnectorsConfigProvider {
RuntimeError::InvalidConfiguration(format!("Failed to build
HTTP client: {err}"))
})?;
+ let mut client_with_middleware = ClientBuilder::new(client);
+
+ if retry_config.enabled {
+ tracing::trace!("Apply retry config: {:?}", retry_config);
+
+ let retry_policy = ExponentialBackoff::builder()
+ .retry_bounds(
+ retry_config.initial_backoff.get_duration(),
+ retry_config.max_backoff.get_duration(),
+ )
+ .base(retry_config.backoff_multiplier)
+ .jitter(Jitter::Bounded)
+ .build_with_max_retries(retry_config.max_attempts);
+
+ let retry_transient_middleware =
+ RetryTransientMiddleware::new_with_policy(retry_policy);
+
+ client_with_middleware =
client_with_middleware.with(retry_transient_middleware);
+ }
+
+ let final_client = client_with_middleware.build();
+
let url_builder = UrlBuilder::new(base_url, url_templates);
let response_extractor = ResponseExtractor::new(response_config);
Ok(Self {
url_builder,
response_extractor,
- client,
+ client: final_client,
})
}
diff --git a/core/connectors/runtime/src/configs/runtime.rs
b/core/connectors/runtime/src/configs/runtime.rs
index 720204f07..f9ef28c90 100644
--- a/core/connectors/runtime/src/configs/runtime.rs
+++ b/core/connectors/runtime/src/configs/runtime.rs
@@ -53,6 +53,44 @@ pub struct IggyTlsConfig {
pub domain: Option<String>,
}
+#[serde_as]
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct RetryConfig {
+ pub enabled: bool,
+ pub max_attempts: u32,
+ #[serde_as(as = "DisplayFromStr")]
+ pub initial_backoff: IggyDuration,
+ #[serde_as(as = "DisplayFromStr")]
+ pub max_backoff: IggyDuration,
+ pub backoff_multiplier: u32,
+}
+
+impl Default for RetryConfig {
+ fn default() -> Self {
+ Self {
+ enabled: true,
+ max_attempts: 3,
+ initial_backoff: IggyDuration::new_from_secs(1),
+ max_backoff: IggyDuration::new_from_secs(30),
+ backoff_multiplier: 2,
+ }
+ }
+}
+
+impl Display for RetryConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{{ enabled: {}, max_attempts: {}, initial_backoff: {},
max_backoff: {}, backoff_multiplier: {} }}",
+ self.enabled,
+ self.max_attempts,
+ self.initial_backoff,
+ self.max_backoff,
+ self.backoff_multiplier
+ )
+ }
+}
+
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
#[serde(default)]
pub struct LocalConnectorsConfig {
@@ -72,18 +110,21 @@ pub struct HttpConnectorsConfig {
pub url_templates: HashMap<String, String>,
#[serde(default)]
pub response: ResponseConfig,
+ #[serde(default)]
+ pub retry: RetryConfig,
}
impl Display for HttpConnectorsConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{ type: \"http\", base_url: {:?}, request_headers: {:?},
timeout: {}, url_templates: {:?}, response: {:?} }}",
+ "{{ type: \"http\", base_url: {:?}, request_headers: {:?},
timeout: {}, url_templates: {:?}, response: {:?}, retry: {} }}",
self.base_url,
self.request_headers.keys(),
self.timeout,
self.url_templates,
- self.response
+ self.response,
+ self.retry
)
}
}
@@ -99,6 +140,7 @@ pub struct ResponseConfig {
pub error_path: Option<String>,
}
+#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "config_type", rename_all = "lowercase")]
pub enum ConnectorsConfig {