alamb commented on code in PR #14918: URL: https://github.com/apache/datafusion/pull/14918#discussion_r1975867484
########## datafusion/core/src/test/object_store.rs: ########## @@ -61,3 +71,121 @@ pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta version: None, } } + +/// Blocks the object_store `head` call until `concurrency` number of calls are pending. +pub fn ensure_head_concurrency( + object_store: Arc<dyn ObjectStore>, + concurrency: usize, +) -> Arc<dyn ObjectStore> { + Arc::new(BlockingObjectStore::new(object_store, concurrency)) +} + +/// An object store that “blocks” in its `head` call until an expected number of concurrent calls are reached. +#[derive(Debug)] +struct BlockingObjectStore { + inner: Arc<dyn ObjectStore>, + barrier: Arc<Barrier>, +} + +impl BlockingObjectStore { + const NAME: &'static str = "BlockingObjectStore"; + fn new(inner: Arc<dyn ObjectStore>, expected_concurrency: usize) -> Self { + Self { + inner, + barrier: Arc::new(Barrier::new(expected_concurrency)), + } + } +} + +impl Display for BlockingObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.inner, f) + } +} + +/// All trait methods are forwarded to the inner object store, except for +/// the `head` method which waits until the expected number of concurrent calls is reached. +#[async_trait::async_trait] +impl ObjectStore for BlockingObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> object_store::Result<PutResult> { + self.inner.put_opts(location, payload, opts).await + } + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> object_store::Result<Box<dyn MultipartUpload>> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result<GetResult> { + self.inner.get_opts(location, options).await + } + + async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> { + println!( + "{} received head call for {location}", + BlockingObjectStore::NAME + ); + // Wait until the expected number of concurrent calls is reached, but timeout after 1 second to avoid hanging failing tests. Review Comment: 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org