tustvold commented on code in PR #5538:
URL: https://github.com/apache/arrow-rs/pull/5538#discussion_r1557802430
##########
object_store/src/aws/client.rs:
##########
@@ -468,8 +468,8 @@ impl S3Client {
let response = builder
.header(CONTENT_TYPE, "application/xml")
.body(body)
- .with_aws_sigv4(credential.authorizer(), payload_sha256.as_deref())
- .send_retry_with_idempotency(&self.config.retry_config, false)
Review Comment:
`false` is the default for a `POST` request
##########
object_store/src/aws/dynamo.rs:
##########
@@ -186,11 +186,7 @@ impl DynamoCommit {
to: &Path,
) -> Result<()> {
self.conditional_op(client, to, None, || async {
- client
- .copy_request(from, to)
- .set_idempotent(false)
Review Comment:
`false` is the default
##########
object_store/src/client/retry.rs:
##########
@@ -166,26 +166,54 @@ impl Default for RetryConfig {
}
}
-fn send_retry_impl(
- builder: reqwest::RequestBuilder,
- config: &RetryConfig,
- is_idempotent: Option<bool>,
-) -> BoxFuture<'static, Result<Response>> {
- let mut backoff = Backoff::new(&config.backoff);
- let max_retries = config.max_retries;
- let retry_timeout = config.retry_timeout;
+pub struct RetryableRequest {
+ client: Client,
+ request: Request,
- let (client, req) = builder.build_split();
- let req = req.expect("request must be valid");
- let is_idempotent = is_idempotent.unwrap_or(req.method().is_safe());
+ max_retries: usize,
+ retry_timeout: Duration,
+ backoff: Backoff,
- async move {
+ idempotent: Option<bool>,
+ payload: Option<PutPayload>,
+}
+
+impl RetryableRequest {
+ /// Set whether this request is idempotent
+ pub fn idempotent(self, idempotent: bool) -> Self {
+ Self {
+ idempotent: Some(idempotent),
+ ..self
+ }
+ }
+
+ /// Provide a [`PutPayload`]
+ pub fn payload(self, payload: Option<PutPayload>) -> Self {
+ Self { payload, ..self }
+ }
+
+ pub async fn send(self) -> Result<Response> {
+ let max_retries = self.max_retries;
+ let retry_timeout = self.retry_timeout;
let mut retries = 0;
let now = Instant::now();
+ let mut backoff = self.backoff;
+ let is_idempotent = self
+ .idempotent
+ .unwrap_or_else(|| self.request.method().is_safe());
+
loop {
- let s = req.try_clone().expect("request body must be cloneable");
- match client.execute(s).await {
+ let mut s = self
+ .request
+ .try_clone()
+ .expect("request body must be cloneable");
+
+ if let Some(x) = &self.payload {
+ *s.body_mut() = Some(x.body());
Review Comment:
Setting this here sidesteps issues around clone-ability of stream based
request bodies
##########
object_store/src/azure/client.rs:
##########
@@ -237,21 +242,23 @@ impl AzureClient {
builder = builder.header(CONTENT_TYPE, value);
}
- builder = builder
Review Comment:
This is moved into `send` above
##########
object_store/src/buffered.rs:
##########
@@ -231,7 +231,7 @@ impl std::fmt::Debug for BufWriter {
enum BufWriterState {
/// Buffer up to capacity bytes
- Buffer(Path, Vec<u8>),
+ Buffer(Path, PutPayloadMut),
Review Comment:
This change means that we no longer bump allocate :tada:
##########
object_store/src/gcp/client.rs:
##########
@@ -327,20 +335,24 @@ impl GoogleCloudStorageClient {
let builder = self
.client
.request(Method::PUT, url)
- .header(header::CONTENT_TYPE, content_type)
- .header(header::CONTENT_LENGTH, payload.len())
Review Comment:
Moved into send
##########
object_store/src/upload.rs:
##########
@@ -121,7 +122,8 @@ impl WriteMultipart {
pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size:
usize) -> Self {
Self {
upload,
- buffer: Vec::with_capacity(chunk_size),
+ chunk_size,
+ buffer: PutPayloadMut::new(),
Review Comment:
This change means we avoid bump allocating but also don't allocate an entire
10MB buffer up-front only to potentially use some fraction of it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]