tisonkun commented on code in PR #7118:
URL: https://github.com/apache/opendal/pull/7118#discussion_r2678312874


##########
integrations/object_store/src/store.rs:
##########
@@ -430,15 +413,27 @@ impl ObjectStore for OpendalStore {
         })
     }
 
-    async fn delete(&self, location: &Path) -> object_store::Result<()> {
-        let decoded_location = percent_decode_path(location.as_ref());
-        self.inner
-            .delete(&decoded_location)
-            .into_send()
-            .await
-            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
-
-        Ok(())
+    fn delete_stream(
+        &self,
+        locations: BoxStream<'static, object_store::Result<Path>>,
+    ) -> BoxStream<'static, object_store::Result<Path>> {
+        // TODO: use batch delete to optimize performance
+        let client = self.inner.clone();
+        locations
+            .then(move |location| {
+                let client = client.clone();
+                async move {
+                    let location = location?;
+                    let decoded_location = 
percent_decode_path(location.as_ref());
+                    client
+                        .delete(&decoded_location)
+                        .into_send()
+                        .await
+                        .map_err(|err| format_object_store_error(err, 
location.as_ref()))?;
+                    Ok(location)
+                }
+            })
+            .boxed()

Review Comment:
   `object_store` uses multiple batching mechanisms:
   
   ```rust
       fn delete_stream(
           &self,
           locations: BoxStream<'static, Result<Path>>,
       ) -> BoxStream<'static, Result<Path>> {
           let client = Arc::clone(&self.client);
           locations
               .try_chunks(1_000)
               .map(move |locations| {
                   let client = Arc::clone(&client);
                   async move {
                       // Early return the error. We ignore the paths that have 
already been
                       // collected into the chunk.
                       let locations = locations.map_err(|e| e.1)?;
                       client
                           .bulk_delete_request(locations)
                           .await
                           .map(futures::stream::iter)
                   }
               })
               .buffered(20)
               .try_flatten()
               .boxed()
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to