tustvold commented on code in PR #5806:
URL: https://github.com/apache/arrow-rs/pull/5806#discussion_r1617517276
##########
object_store/src/azure/builder.rs:
##########
@@ -806,6 +809,12 @@ impl MicrosoftAzureBuilder {
self
}
+ /// Set the number of permits for the semaphore controlling concurrency.
+ pub fn with_sempahore_permits(mut self, permits: usize) -> Self {
Review Comment:
See above, I would call this a concurrency limit and move it onto
ClientOptions
##########
object_store/src/client/retry.rs:
##########
@@ -368,21 +399,29 @@ impl RetryExt for reqwest::RequestBuilder {
}
}
- fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static,
Result<Response>> {
- let request = self.retryable(config);
- Box::pin(async move { request.send().await })
+ fn send_retry(self, ctx: &RequestContext) -> BoxFuture<'static,
Result<Response>> {
Review Comment:
I think we should also update `retryable` above
##########
object_store/src/aws/builder.rs:
##########
@@ -824,6 +827,12 @@ impl AmazonS3Builder {
self
}
+ /// Set the number of permits for the semaphore controlling concurrency.
+ pub fn with_sempahore_permits(mut self, permits: usize) -> Self {
Review Comment:
```suggestion
/// Limit the maximum number of concurrent HTTP requests.
pub fn with_concurrency_limit(mut self, permits: usize) -> Self {
```
I would also suggest moving this onto ClientOptions
##########
object_store/src/client/retry.rs:
##########
@@ -368,21 +399,29 @@ impl RetryExt for reqwest::RequestBuilder {
}
}
- fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static,
Result<Response>> {
- let request = self.retryable(config);
- Box::pin(async move { request.send().await })
+ fn send_retry(self, ctx: &RequestContext) -> BoxFuture<'static,
Result<Response>> {
+ let request = self.retryable(&ctx.config);
+ let semaphore = Arc::clone(&ctx.semaphore);
+ Box::pin(async move {
+ let permit = semaphore.acquire_owned().await.unwrap();
Review Comment:
When a request backs off this will currently continue to hold the semaphore,
I think we should avoid this by pushing the semphore acquisition into
RetryableRequest::send
##########
object_store/src/client/retry.rs:
##########
@@ -119,6 +121,35 @@ impl From<Error> for std::io::Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
+/// Holds the configuration and controls the concurrency for retry requests.
+///
+/// `RequestContext` contains the retry configuration and a semaphore to limit
+/// the number of concurrent retry requests, ensuring that requests do not
+/// overwhelm the server.
+#[derive(Debug, Clone)]
+pub struct RequestContext {
+ /// Configuration for retrying requests.
+ ///
+ /// This configuration defines how retries should be handled, including
+ /// the backoff strategy and maximum retry limits.
+ pub config: RetryConfig,
+
+ /// Semaphore to limit the number of concurrent retry requests.
+ ///
+ /// The semaphore controls the number of retry requests that can be
executed
+ /// concurrently, preventing too many requests from being sent at once.
+ pub semaphore: Arc<Semaphore>,
+}
+
+impl Default for RequestContext {
+ fn default() -> Self {
+ Self {
+ config: RetryConfig::default(),
+ semaphore: Arc::new(Semaphore::new(5)),
Review Comment:
I think there should be no concurrency limit by default, as it will be very
application specific
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]