This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f8ff82a187 feat: Adds Instrumented Object Store Registry to
datafusion-cli (#17953)
f8ff82a187 is described below
commit f8ff82a187a9d127ed5c47f620ae474a3be799d6
Author: Blake Orth <[email protected]>
AuthorDate: Wed Oct 8 14:32:41 2025 -0600
feat: Adds Instrumented Object Store Registry to datafusion-cli (#17953)
* feat: Adds Instrumented Object Store Registry to datafusion-cli
- Adds a new Object Store Registry wrapper to datafusion-cli to support
interacting with instrumented object store instances
- Adds basic tests for the new registry wrapper
* Adds doc comments to new public types and methods
---
datafusion-cli/src/main.rs | 7 +++
datafusion-cli/src/object_storage.rs | 2 +
datafusion-cli/src/object_storage/instrumented.rs | 72 +++++++++++++++++++++++
3 files changed, 81 insertions(+)
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index a6b818c109..39aca4cd13 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -27,11 +27,13 @@ use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
};
+use datafusion::execution::object_store::DefaultObjectStoreRegistry;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::logical_expr::ExplainFormat;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc};
+use
datafusion_cli::object_storage::instrumented::InstrumentedObjectStoreRegistry;
use datafusion_cli::{
exec,
pool_type::PoolType,
@@ -206,6 +208,11 @@ async fn main_inner() -> Result<()> {
rt_builder = rt_builder.with_disk_manager_builder(builder);
}
+ let instrumented_registry =
Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new(
+ DefaultObjectStoreRegistry::new(),
+ )));
+ rt_builder =
rt_builder.with_object_store_registry(instrumented_registry.clone());
+
let runtime_env = rt_builder.build_arc()?;
// enable dynamic file query
diff --git a/datafusion-cli/src/object_storage.rs
b/datafusion-cli/src/object_storage.rs
index 533ac3ba03..e6e6be42c7 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+pub mod instrumented;
+
use async_trait::async_trait;
use aws_config::BehaviorVersion;
use aws_credential_types::provider::{
diff --git a/datafusion-cli/src/object_storage/instrumented.rs
b/datafusion-cli/src/object_storage/instrumented.rs
new file mode 100644
index 0000000000..c4bd44011d
--- /dev/null
+++ b/datafusion-cli/src/object_storage/instrumented.rs
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::execution::object_store::ObjectStoreRegistry;
+use object_store::ObjectStore;
+use url::Url;
+
+/// Provides access to wrapped [`ObjectStore`] instances that record requests
for reporting
+#[derive(Debug)]
+pub struct InstrumentedObjectStoreRegistry {
+ inner: Arc<dyn ObjectStoreRegistry>,
+}
+
+impl InstrumentedObjectStoreRegistry {
+ /// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the
provided
+ /// [`ObjectStoreRegistry`]
+ pub fn new(registry: Arc<dyn ObjectStoreRegistry>) -> Self {
+ Self { inner: registry }
+ }
+}
+
+impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
+ fn register_store(
+ &self,
+ url: &Url,
+ store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore>> {
+ self.inner.register_store(url, store)
+ }
+
+ fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn
ObjectStore>> {
+ self.inner.get_store(url)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use datafusion::execution::object_store::DefaultObjectStoreRegistry;
+
+ use super::*;
+
+ #[test]
+ fn instrumented_registry() {
+ let reg = Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new(
+ DefaultObjectStoreRegistry::new(),
+ )));
+ let store = object_store::memory::InMemory::new();
+
+ let url = "mem://test".parse().unwrap();
+ let registered = reg.register_store(&url, Arc::new(store));
+ assert!(registered.is_none());
+
+ let fetched = reg.get_store(&url);
+ assert!(fetched.is_ok())
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]