This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f8ff82a187 feat: Adds Instrumented Object Store Registry to 
datafusion-cli (#17953)
f8ff82a187 is described below

commit f8ff82a187a9d127ed5c47f620ae474a3be799d6
Author: Blake Orth <[email protected]>
AuthorDate: Wed Oct 8 14:32:41 2025 -0600

    feat: Adds Instrumented Object Store Registry to datafusion-cli (#17953)
    
    * feat: Adds Instrumented Object Store Registry to datafusion-cli
     - Adds a new Object Store Registry wrapper to datafusion-cli to support
       interacting with instrumented object store instances
     - Adds basic tests for the new registry wrapper
    
    * Adds doc comments to new public types and methods
---
 datafusion-cli/src/main.rs                        |  7 +++
 datafusion-cli/src/object_storage.rs              |  2 +
 datafusion-cli/src/object_storage/instrumented.rs | 72 +++++++++++++++++++++++
 3 files changed, 81 insertions(+)

diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index a6b818c109..39aca4cd13 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -27,11 +27,13 @@ use datafusion::execution::context::SessionConfig;
 use datafusion::execution::memory_pool::{
     FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
 };
+use datafusion::execution::object_store::DefaultObjectStoreRegistry;
 use datafusion::execution::runtime_env::RuntimeEnvBuilder;
 use datafusion::logical_expr::ExplainFormat;
 use datafusion::prelude::SessionContext;
 use datafusion_cli::catalog::DynamicObjectStoreCatalog;
 use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc};
+use 
datafusion_cli::object_storage::instrumented::InstrumentedObjectStoreRegistry;
 use datafusion_cli::{
     exec,
     pool_type::PoolType,
@@ -206,6 +208,11 @@ async fn main_inner() -> Result<()> {
         rt_builder = rt_builder.with_disk_manager_builder(builder);
     }
 
+    let instrumented_registry = 
Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new(
+        DefaultObjectStoreRegistry::new(),
+    )));
+    rt_builder = 
rt_builder.with_object_store_registry(instrumented_registry.clone());
+
     let runtime_env = rt_builder.build_arc()?;
 
     // enable dynamic file query
diff --git a/datafusion-cli/src/object_storage.rs 
b/datafusion-cli/src/object_storage.rs
index 533ac3ba03..e6e6be42c7 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub mod instrumented;
+
 use async_trait::async_trait;
 use aws_config::BehaviorVersion;
 use aws_credential_types::provider::{
diff --git a/datafusion-cli/src/object_storage/instrumented.rs 
b/datafusion-cli/src/object_storage/instrumented.rs
new file mode 100644
index 0000000000..c4bd44011d
--- /dev/null
+++ b/datafusion-cli/src/object_storage/instrumented.rs
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::execution::object_store::ObjectStoreRegistry;
+use object_store::ObjectStore;
+use url::Url;
+
+/// Provides access to wrapped [`ObjectStore`] instances that record requests 
for reporting
+#[derive(Debug)]
+pub struct InstrumentedObjectStoreRegistry {
+    inner: Arc<dyn ObjectStoreRegistry>,
+}
+
+impl InstrumentedObjectStoreRegistry {
+    /// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the 
provided
+    /// [`ObjectStoreRegistry`]
+    pub fn new(registry: Arc<dyn ObjectStoreRegistry>) -> Self {
+        Self { inner: registry }
+    }
+}
+
+impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
+    fn register_store(
+        &self,
+        url: &Url,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        self.inner.register_store(url, store)
+    }
+
+    fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn 
ObjectStore>> {
+        self.inner.get_store(url)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use datafusion::execution::object_store::DefaultObjectStoreRegistry;
+
+    use super::*;
+
+    #[test]
+    fn instrumented_registry() {
+        let reg = Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new(
+            DefaultObjectStoreRegistry::new(),
+        )));
+        let store = object_store::memory::InMemory::new();
+
+        let url = "mem://test".parse().unwrap();
+        let registered = reg.register_store(&url, Arc::new(store));
+        assert!(registered.is_none());
+
+        let fetched = reg.get_store(&url);
+        assert!(fetched.is_ok())
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to