alamb commented on code in PR #14918:
URL: https://github.com/apache/datafusion/pull/14918#discussion_r1975867484
##########
datafusion/core/src/test/object_store.rs:
##########
@@ -61,3 +71,121 @@ pub fn local_unpartitioned_file(path: impl
AsRef<std::path::Path>) -> ObjectMeta
version: None,
}
}
+
+/// Blocks the object_store `head` call until `concurrency` number of calls
are pending.
+pub fn ensure_head_concurrency(
+ object_store: Arc<dyn ObjectStore>,
+ concurrency: usize,
+) -> Arc<dyn ObjectStore> {
+ Arc::new(BlockingObjectStore::new(object_store, concurrency))
+}
+
+/// An object store that “blocks” in its `head` call until an expected number
of concurrent calls are reached.
+#[derive(Debug)]
+struct BlockingObjectStore {
+ inner: Arc<dyn ObjectStore>,
+ barrier: Arc<Barrier>,
+}
+
+impl BlockingObjectStore {
+ const NAME: &'static str = "BlockingObjectStore";
+ fn new(inner: Arc<dyn ObjectStore>, expected_concurrency: usize) -> Self {
+ Self {
+ inner,
+ barrier: Arc::new(Barrier::new(expected_concurrency)),
+ }
+ }
+}
+
+impl Display for BlockingObjectStore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ Display::fmt(&self.inner, f)
+ }
+}
+
+/// All trait methods are forwarded to the inner object store, except for
+/// the `head` method which waits until the expected number of concurrent
calls is reached.
+#[async_trait::async_trait]
+impl ObjectStore for BlockingObjectStore {
+ async fn put_opts(
+ &self,
+ location: &Path,
+ payload: PutPayload,
+ opts: PutOptions,
+ ) -> object_store::Result<PutResult> {
+ self.inner.put_opts(location, payload, opts).await
+ }
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> object_store::Result<Box<dyn MultipartUpload>> {
+ self.inner.put_multipart_opts(location, opts).await
+ }
+
+ async fn get_opts(
+ &self,
+ location: &Path,
+ options: GetOptions,
+ ) -> object_store::Result<GetResult> {
+ self.inner.get_opts(location, options).await
+ }
+
+ async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
+ println!(
+ "{} received head call for {location}",
+ BlockingObjectStore::NAME
+ );
+ // Wait until the expected number of concurrent calls is reached, but
timeout after 1 second to avoid hanging failing tests.
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]