yahoNanJing commented on code in PR #5543:
URL: https://github.com/apache/arrow-datafusion/pull/5543#discussion_r1132276595


##########
datafusion/execution/src/object_store.rs:
##########
@@ -89,6 +89,138 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
     fn get_by_url(&self, url: &Url) -> Result<Arc<dyn ObjectStore>>;
 }
 
+/// Provides a mechanism to get and put object stores.
+pub trait ObjectStoreManager: Send + Sync + std::fmt::Debug + 'static {
+    /// If a store with the same schema and host existed before, it is 
replaced and returned
+    fn register_store(
+        &self,
+        scheme: &str,
+        host: &str,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>>;
+
+    /// Get a suitable store for the provided URL. For example:
+    ///
+    /// - URL with scheme `file:///` or no schema will return the default 
LocalFS store
+    /// - URL with scheme `s3://bucket/` will return the S3 store
+    /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store
+    fn get_by_url(&self, url: &Url) -> Result<Arc<dyn ObjectStore>>;
+}
+
+/// The default [`ObjectStoreManager`] is with no `ObjectStoreProvider`.
+pub struct DefaultObjectStoreManager {
+    /// A map from scheme to object store that serve list / read operations 
for the store
+    object_stores: DashMap<String, Arc<dyn ObjectStore>>,
+}
+
+impl std::fmt::Debug for DefaultObjectStoreManager {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("DefaultObjectStoreManager")
+            .field(
+                "schemes",
+                &self
+                    .object_stores
+                    .iter()
+                    .map(|o| o.key().clone())
+                    .collect::<Vec<_>>(),
+            )
+            .finish()
+    }
+}
+
+impl Default for DefaultObjectStoreManager {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl DefaultObjectStoreManager {
+    /// This will register [`LocalFileSystem`] to handle `file://` paths
+    pub fn new() -> Self {
+        let object_stores: DashMap<String, Arc<dyn ObjectStore>> = 
DashMap::new();
+        object_stores.insert("file://".to_string(), 
Arc::new(LocalFileSystem::new()));
+        Self { object_stores }
+    }
+}
+
+impl ObjectStoreManager for DefaultObjectStoreManager {
+    fn register_store(
+        &self,
+        scheme: &str,
+        host: &str,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        let s = format!("{scheme}://{host}");
+        self.object_stores.insert(s, store)
+    }
+
+    /// Get a suitable store for the provided URL. For example:
+    ///
+    /// - URL with scheme `file:///` or no schema will return the default 
LocalFS store
+    /// - URL with scheme `s3://bucket/` will return the S3 store if it's 
registered
+    /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store 
if it's registered
+    fn get_by_url(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
+        let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
+        self.object_stores
+            .get(s)
+            .map(|o| o.value().clone())
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "No suitable object store found for {url}"
+                ))
+            })
+    }
+}

Review Comment:
   In Ballista, we can create an `ObjectStoreRegistry` with a 
`CacheBasedObjectStoreManager`. Then every `ObjectStore` got by 
`ObjectStoreRegistry` will be wrapped with a cache layer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to