alamb commented on code in PR #14918:
URL: https://github.com/apache/datafusion/pull/14918#discussion_r1975867484


##########
datafusion/core/src/test/object_store.rs:
##########
@@ -61,3 +71,121 @@ pub fn local_unpartitioned_file(path: impl 
AsRef<std::path::Path>) -> ObjectMeta
         version: None,
     }
 }
+
+/// Blocks the object_store `head` call until `concurrency` number of calls 
are pending.
+pub fn ensure_head_concurrency(
+    object_store: Arc<dyn ObjectStore>,
+    concurrency: usize,
+) -> Arc<dyn ObjectStore> {
+    Arc::new(BlockingObjectStore::new(object_store, concurrency))
+}
+
+/// An object store that “blocks” in its `head` call until an expected number 
of concurrent calls are reached.
+#[derive(Debug)]
+struct BlockingObjectStore {
+    inner: Arc<dyn ObjectStore>,
+    barrier: Arc<Barrier>,
+}
+
+impl BlockingObjectStore {
+    const NAME: &'static str = "BlockingObjectStore";
+    fn new(inner: Arc<dyn ObjectStore>, expected_concurrency: usize) -> Self {
+        Self {
+            inner,
+            barrier: Arc::new(Barrier::new(expected_concurrency)),
+        }
+    }
+}
+
+impl Display for BlockingObjectStore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        Display::fmt(&self.inner, f)
+    }
+}
+
+/// All trait methods are forwarded to the inner object store, except for
+/// the `head` method which waits until the expected number of concurrent 
calls is reached.
+#[async_trait::async_trait]
+impl ObjectStore for BlockingObjectStore {
+    async fn put_opts(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> object_store::Result<PutResult> {
+        self.inner.put_opts(location, payload, opts).await
+    }
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> object_store::Result<Box<dyn MultipartUpload>> {
+        self.inner.put_multipart_opts(location, opts).await
+    }
+
+    async fn get_opts(
+        &self,
+        location: &Path,
+        options: GetOptions,
+    ) -> object_store::Result<GetResult> {
+        self.inner.get_opts(location, options).await
+    }
+
+    async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
+        println!(
+            "{} received head call for {location}",
+            BlockingObjectStore::NAME
+        );
+        // Wait until the expected number of concurrent calls is reached, but 
timeout after 1 second to avoid hanging failing tests.

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to