This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 99c9e11e Remove cache functionality (#1076)
99c9e11e is described below

commit 99c9e11e1c55ae34c57b780dd86ded8e3457b1f8
Author: Marko Milenković <[email protected]>
AuthorDate: Sat Oct 12 15:51:57 2024 +0100

    Remove cache functionality (#1076)
    
    As it may be useful I'd argue that it should not be
    part of core ballista. We see in the community
    caches build on top of object_store.
    
    If functionality like this is needed it should be
    implemented as scheduler-policy or similar.
    
    Relates to: #1066 & #1067
---
 Cargo.toml                                         |   2 +-
 ballista/cache/Cargo.toml                          |  38 --
 ballista/cache/README.md                           |  22 -
 ballista/cache/src/backend/mod.rs                  |  73 ---
 ballista/cache/src/backend/policy/lru/lru_cache.rs | 337 ------------
 ballista/cache/src/backend/policy/lru/mod.rs       | 111 ----
 ballista/cache/src/backend/policy/mod.rs           |  61 ---
 ballista/cache/src/lib.rs                          |  54 --
 ballista/cache/src/listener/cache_policy.rs        | 133 -----
 ballista/cache/src/listener/loading_cache.rs       | 197 -------
 ballista/cache/src/listener/mod.rs                 |  19 -
 .../src/loading_cache/cancellation_safe_future.rs  | 179 -------
 ballista/cache/src/loading_cache/driver.rs         | 573 ---------------------
 ballista/cache/src/loading_cache/loader.rs         |  52 --
 ballista/cache/src/loading_cache/mod.rs            | 113 ----
 ballista/cache/src/metrics/loading_cache.rs        | 292 -----------
 ballista/cache/src/metrics/mod.rs                  |  18 -
 ballista/core/Cargo.toml                           |   1 -
 ballista/core/src/cache_layer/medium/local_disk.rs |  69 ---
 .../core/src/cache_layer/medium/local_memory.rs    |  73 ---
 ballista/core/src/cache_layer/medium/mod.rs        |  42 --
 ballista/core/src/cache_layer/mod.rs               | 128 -----
 ballista/core/src/cache_layer/object_store/file.rs | 267 ----------
 ballista/core/src/cache_layer/object_store/mod.rs  | 168 ------
 ballista/core/src/cache_layer/policy/file.rs       | 302 -----------
 ballista/core/src/cache_layer/policy/mod.rs        |  18 -
 ballista/core/src/config.rs                        |   2 -
 ballista/core/src/lib.rs                           |   2 -
 ballista/core/src/object_store_registry/cache.rs   |  86 ----
 ballista/core/src/object_store_registry/mod.rs     |   3 -
 ballista/executor/src/execution_loop.rs            |   2 +-
 ballista/executor/src/executor.rs                  |  20 +-
 ballista/executor/src/executor_process.rs          |  44 --
 ballista/executor/src/executor_server.rs           |  14 +-
 ballista/executor/src/standalone.rs                |   1 -
 ballista/scheduler/src/cluster/mod.rs              |   4 -
 ballista/scheduler/src/state/execution_graph.rs    |   5 +-
 ballista/scheduler/src/state/task_manager.rs       |  77 +--
 38 files changed, 29 insertions(+), 3573 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index ea1c8321..be204446 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,7 +17,7 @@
 
 [workspace]
 exclude = ["python"]
-members = ["ballista-cli", "ballista/cache", "ballista/client", 
"ballista/core", "ballista/executor", "ballista/scheduler", "benchmarks", 
"examples"]
+members = ["ballista-cli", "ballista/client", "ballista/core", 
"ballista/executor", "ballista/scheduler", "benchmarks", "examples"]
 resolver = "2"
 
 [workspace.dependencies]
diff --git a/ballista/cache/Cargo.toml b/ballista/cache/Cargo.toml
deleted file mode 100644
index 78accd29..00000000
--- a/ballista/cache/Cargo.toml
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[package]
-name = "ballista-cache"
-description = "Ballista Cache"
-license = "Apache-2.0"
-homepage = "https://github.com/apache/arrow-ballista";
-repository = "https://github.com/apache/arrow-ballista";
-readme = "README.md"
-authors = ["Apache DataFusion <[email protected]>"]
-version = "0.12.0"
-edition = "2021"
-
-# See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
-async-trait = "0.1.64"
-futures = "0.3"
-hashbrown = "0.14"
-hashlink = "0.8.4"
-log = "0.4"
-parking_lot = "0.12"
-tokio = { version = "1.25", features = ["macros", "parking_lot", 
"rt-multi-thread", "sync", "time"] }
diff --git a/ballista/cache/README.md b/ballista/cache/README.md
deleted file mode 100644
index 4cce0d2b..00000000
--- a/ballista/cache/README.md
+++ /dev/null
@@ -1,22 +0,0 @@
-<!---
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-
-# Ballista Cache Library
-
-This crate contains the Ballista cache library which is used as a dependency 
from other Ballista crates.
diff --git a/ballista/cache/src/backend/mod.rs 
b/ballista/cache/src/backend/mod.rs
deleted file mode 100644
index f24119e8..00000000
--- a/ballista/cache/src/backend/mod.rs
+++ /dev/null
@@ -1,73 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod policy;
-
-use crate::backend::policy::CachePolicy;
-use std::fmt::Debug;
-use std::hash::Hash;
-
-/// Backend to keep and manage stored entries.
-///
-/// A backend might remove entries at any point, e.g. due to memory pressure 
or expiration.
-#[derive(Debug)]
-pub struct CacheBackend<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    policy: Box<dyn CachePolicy<K = K, V = V>>,
-}
-
-impl<K, V> CacheBackend<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    pub fn new(policy: impl CachePolicy<K = K, V = V>) -> Self {
-        Self {
-            policy: Box::new(policy),
-        }
-    }
-
-    /// Get value for given key if it exists.
-    pub fn get(&mut self, k: &K) -> Option<V> {
-        self.policy.get(k)
-    }
-
-    /// Peek value for given key if it exists.
-    ///
-    /// In contrast to [`get`](Self::get) this will only return a value if 
there is a stored value.
-    /// This will not change the cache contents.
-    pub fn peek(&mut self, k: &K) -> Option<V> {
-        self.policy.peek(k)
-    }
-
-    /// Put value for given key.
-    ///
-    /// If a key already exists, its old value will be returned.
-    pub fn put(&mut self, k: K, v: V) -> Option<V> {
-        self.policy.put(k, v).0
-    }
-
-    /// Remove value for given key.
-    ///
-    /// If a key does not exist, none will be returned.
-    pub fn remove(&mut self, k: &K) -> Option<V> {
-        self.policy.remove(k)
-    }
-}
diff --git a/ballista/cache/src/backend/policy/lru/lru_cache.rs 
b/ballista/cache/src/backend/policy/lru/lru_cache.rs
deleted file mode 100644
index 284a4a28..00000000
--- a/ballista/cache/src/backend/policy/lru/lru_cache.rs
+++ /dev/null
@@ -1,337 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::backend::policy::lru::ResourceCounter;
-use crate::backend::policy::{lru::LruCachePolicy, CachePolicy, 
CachePolicyPutResult};
-use hashbrown::hash_map::DefaultHashBuilder;
-use hashlink::linked_hash_map::{self, IntoIter, Iter, IterMut, LinkedHashMap};
-use std::any::Any;
-use std::fmt;
-use std::fmt::Debug;
-use std::hash::{BuildHasher, Hash};
-
-pub struct LruCache<K, V, H = DefaultHashBuilder>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    map: LinkedHashMap<K, V, H>,
-    resource_counter: Box<dyn ResourceCounter<K = K, V = V>>,
-}
-
-impl<K, V> LruCache<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    pub fn with_resource_counter<R>(resource_counter: R) -> Self
-    where
-        R: ResourceCounter<K = K, V = V>,
-    {
-        LruCache {
-            map: LinkedHashMap::new(),
-            resource_counter: Box::new(resource_counter),
-        }
-    }
-}
-
-impl<K, V, H> LruCache<K, V, H>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    pub fn with_resource_counter_and_hasher<R>(
-        resource_counter: R,
-        hash_builder: H,
-    ) -> Self
-    where
-        R: ResourceCounter<K = K, V = V>,
-    {
-        LruCache {
-            map: LinkedHashMap::with_hasher(hash_builder),
-            resource_counter: Box::new(resource_counter),
-        }
-    }
-
-    pub fn len(&self) -> usize {
-        self.map.len()
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.map.is_empty()
-    }
-
-    pub fn iter(&self) -> Iter<K, V> {
-        self.map.iter()
-    }
-
-    pub fn iter_mut(&mut self) -> IterMut<K, V> {
-        self.map.iter_mut()
-    }
-}
-
-impl<K, V, H> LruCachePolicy for LruCache<K, V, H>
-where
-    K: 'static + Clone + Debug + Eq + Hash + Ord + Send,
-    H: 'static + BuildHasher + Debug + Send,
-    V: 'static + Clone + Debug + Send,
-{
-    fn get_lru(&mut self, k: &Self::K) -> Option<Self::V> {
-        match self.map.raw_entry_mut().from_key(k) {
-            linked_hash_map::RawEntryMut::Occupied(mut occupied) => {
-                occupied.to_back();
-                Some(occupied.into_mut().clone())
-            }
-            linked_hash_map::RawEntryMut::Vacant(_) => None,
-        }
-    }
-
-    fn put_lru(
-        &mut self,
-        k: Self::K,
-        v: Self::V,
-    ) -> CachePolicyPutResult<Self::K, Self::V> {
-        let old_val = self.map.insert(k.clone(), v.clone());
-        // Consume resources for (k, v)
-        self.resource_counter.consume(&k, &v);
-        // Restore resources for old (k, old_val)
-        if let Some(old_val) = &old_val {
-            self.resource_counter.restore(&k, old_val);
-        }
-
-        let mut popped_entries = vec![];
-        while self.resource_counter.exceed_capacity() {
-            if let Some(entry) = self.pop_lru() {
-                popped_entries.push(entry);
-            }
-        }
-        (old_val, popped_entries)
-    }
-
-    fn pop_lru(&mut self) -> Option<(Self::K, Self::V)> {
-        if let Some(entry) = self.map.pop_front() {
-            self.resource_counter.restore(&entry.0, &entry.1);
-            Some(entry)
-        } else {
-            None
-        }
-    }
-}
-
-impl<K, V, H> CachePolicy for LruCache<K, V, H>
-where
-    K: 'static + Clone + Debug + Eq + Hash + Ord + Send,
-    H: 'static + BuildHasher + Debug + Send,
-    V: 'static + Clone + Debug + Send,
-{
-    type K = K;
-    type V = V;
-
-    fn get(&mut self, k: &Self::K) -> Option<Self::V> {
-        self.get_lru(k)
-    }
-
-    fn peek(&mut self, k: &Self::K) -> Option<Self::V> {
-        self.map.get(k).cloned()
-    }
-
-    fn put(&mut self, k: Self::K, v: Self::V) -> CachePolicyPutResult<Self::K, 
Self::V> {
-        self.put_lru(k, v)
-    }
-
-    fn remove(&mut self, k: &Self::K) -> Option<Self::V> {
-        if let Some(v) = self.map.remove(k) {
-            self.resource_counter.restore(k, &v);
-            Some(v)
-        } else {
-            None
-        }
-    }
-
-    fn pop(&mut self) -> Option<(Self::K, Self::V)> {
-        self.pop_lru()
-    }
-
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-}
-
-impl<K, V, H> IntoIterator for LruCache<K, V, H>
-where
-    K: 'static + Clone + Debug + Eq + Hash + Ord + Send,
-    V: 'static + Clone + Debug + Send,
-{
-    type Item = (K, V);
-    type IntoIter = IntoIter<K, V>;
-
-    fn into_iter(self) -> IntoIter<K, V> {
-        self.map.into_iter()
-    }
-}
-
-impl<'a, K, V, H> IntoIterator for &'a LruCache<K, V, H>
-where
-    K: 'static + Clone + Debug + Eq + Hash + Ord + Send,
-    V: 'static + Clone + Debug + Send,
-{
-    type Item = (&'a K, &'a V);
-    type IntoIter = Iter<'a, K, V>;
-
-    fn into_iter(self) -> Iter<'a, K, V> {
-        self.iter()
-    }
-}
-
-impl<'a, K, V, H> IntoIterator for &'a mut LruCache<K, V, H>
-where
-    K: 'static + Clone + Debug + Eq + Hash + Ord + Send,
-    V: 'static + Clone + Debug + Send,
-{
-    type Item = (&'a K, &'a mut V);
-    type IntoIter = IterMut<'a, K, V>;
-
-    fn into_iter(self) -> IterMut<'a, K, V> {
-        self.iter_mut()
-    }
-}
-
-impl<K, V, H> Debug for LruCache<K, V, H>
-where
-    K: 'static + Clone + Debug + Eq + Hash + Ord + Send,
-    V: 'static + Clone + Debug + Send,
-{
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        f.debug_map().entries(self.iter().rev()).finish()
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use crate::backend::policy::lru::lru_cache::LruCache;
-    use crate::backend::policy::lru::{DefaultResourceCounter, ResourceCounter};
-    use crate::backend::policy::CachePolicy;
-    use hashbrown::HashMap;
-
-    #[test]
-    fn test_cache_with_lru_policy() {
-        let mut cache = 
LruCache::with_resource_counter(DefaultResourceCounter::new(3));
-
-        cache.put("1".to_string(), "file1".to_string());
-        cache.put("2".to_string(), "file2".to_string());
-        cache.put("3".to_string(), "file3".to_string());
-        assert_eq!(3, cache.len());
-
-        cache.put("4".to_string(), "file4".to_string());
-        assert_eq!(3, cache.len());
-        assert!(cache.peek(&"1".to_string()).is_none());
-
-        assert!(cache.peek(&"2".to_string()).is_some());
-        let mut iter = cache.iter();
-        assert_eq!("2", iter.next().unwrap().0);
-        assert_eq!("3", iter.next().unwrap().0);
-        assert_eq!("4", iter.next().unwrap().0);
-
-        assert!(cache.get(&"2".to_string()).is_some());
-        let mut iter = cache.iter();
-        assert_eq!("3", iter.next().unwrap().0);
-        assert_eq!("4", iter.next().unwrap().0);
-        assert_eq!("2", iter.next().unwrap().0);
-
-        assert_eq!(Some("file4".to_string()), cache.remove(&"4".to_string()));
-
-        assert_eq!("3".to_string(), cache.pop().unwrap().0);
-        assert_eq!("2".to_string(), cache.pop().unwrap().0);
-        assert!(cache.pop().is_none());
-    }
-
-    #[test]
-    fn test_cache_with_size_resource_counter() {
-        let mut cache =
-            
LruCache::with_resource_counter(get_test_size_resource_counter(50));
-
-        cache.put("1".to_string(), "file1".to_string());
-        cache.put("2".to_string(), "file2".to_string());
-        cache.put("3".to_string(), "file3".to_string());
-        assert_eq!(3, cache.len());
-
-        cache.put("4".to_string(), "file4".to_string());
-        assert_eq!(2, cache.len());
-        assert!(cache.peek(&"1".to_string()).is_none());
-        assert!(cache.peek(&"2".to_string()).is_none());
-
-        assert!(cache.peek(&"3".to_string()).is_some());
-        let mut iter = cache.iter();
-        assert_eq!("3", iter.next().unwrap().0);
-        assert_eq!("4", iter.next().unwrap().0);
-
-        assert!(cache.get(&"3".to_string()).is_some());
-        let mut iter = cache.iter();
-        assert_eq!("4", iter.next().unwrap().0);
-        assert_eq!("3", iter.next().unwrap().0);
-
-        cache.put("5".to_string(), "file5".to_string());
-        cache.put("3".to_string(), "file3-bak".to_string());
-        cache.put("1".to_string(), "file1".to_string());
-        let mut iter = cache.iter();
-        assert_eq!("3", iter.next().unwrap().0);
-        assert_eq!("1", iter.next().unwrap().0);
-        assert!(iter.next().is_none());
-    }
-
-    fn get_test_size_resource_counter(max_size: usize) -> 
TestSizeResourceCounter {
-        let mut size_map = HashMap::new();
-        size_map.insert(("1".to_string(), "file1".to_string()), 10);
-        size_map.insert(("2".to_string(), "file2".to_string()), 20);
-        size_map.insert(("3".to_string(), "file3".to_string()), 15);
-        size_map.insert(("3".to_string(), "file3-bak".to_string()), 30);
-        size_map.insert(("4".to_string(), "file4".to_string()), 35);
-        size_map.insert(("5".to_string(), "file5".to_string()), 25);
-
-        TestSizeResourceCounter {
-            size_map,
-            max_size,
-            current_size: 0,
-        }
-    }
-
-    #[derive(Debug)]
-    struct TestSizeResourceCounter {
-        size_map: HashMap<(String, String), usize>,
-        max_size: usize,
-        current_size: usize,
-    }
-
-    impl ResourceCounter for TestSizeResourceCounter {
-        type K = String;
-        type V = String;
-
-        fn consume(&mut self, k: &Self::K, v: &Self::V) {
-            let s = self.size_map.get(&(k.clone(), v.clone())).unwrap();
-            self.current_size += s;
-        }
-
-        fn restore(&mut self, k: &Self::K, v: &Self::V) {
-            let s = self.size_map.get(&(k.clone(), v.clone())).unwrap();
-            self.current_size -= s;
-        }
-
-        fn exceed_capacity(&self) -> bool {
-            self.current_size > self.max_size
-        }
-    }
-}
diff --git a/ballista/cache/src/backend/policy/lru/mod.rs 
b/ballista/cache/src/backend/policy/lru/mod.rs
deleted file mode 100644
index a9b85569..00000000
--- a/ballista/cache/src/backend/policy/lru/mod.rs
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod lru_cache;
-
-use crate::backend::policy::CachePolicyPutResult;
-use crate::backend::CachePolicy;
-use std::fmt::Debug;
-use std::hash::Hash;
-use std::marker::PhantomData;
-
-pub trait LruCachePolicy: CachePolicy {
-    /// Retrieve the value for the given key,
-    /// marking it as recently used and moving it to the back of the LRU list.
-    fn get_lru(&mut self, k: &Self::K) -> Option<Self::V>;
-
-    /// Put value for given key.
-    ///
-    /// If a key already exists, its old value will be returned.
-    ///
-    /// If necessary, will remove the value at the front of the LRU list to 
make room.
-    fn put_lru(
-        &mut self,
-        k: Self::K,
-        v: Self::V,
-    ) -> CachePolicyPutResult<Self::K, Self::V>;
-
-    /// Remove the least recently used entry and return it.
-    ///
-    /// If the `LruCache` is empty this will return None.
-    fn pop_lru(&mut self) -> Option<(Self::K, Self::V)>;
-}
-
-pub trait ResourceCounter: Debug + Send + 'static {
-    /// Resource key.
-    type K: Clone + Eq + Hash + Ord + Debug + Send + 'static;
-
-    /// Resource value.
-    type V: Clone + Debug + Send + 'static;
-
-    /// Consume resource for a given key-value pair.
-    fn consume(&mut self, k: &Self::K, v: &Self::V);
-
-    /// Return resource for a given key-value pair.
-    fn restore(&mut self, k: &Self::K, v: &Self::V);
-
-    /// Check whether the current used resource exceeds the capacity
-    fn exceed_capacity(&self) -> bool;
-}
-
-#[derive(Debug, Clone, Copy)]
-pub struct DefaultResourceCounter<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    max_num: usize,
-    current_num: usize,
-    _key_marker: PhantomData<K>,
-    _value_marker: PhantomData<V>,
-}
-
-impl<K, V> DefaultResourceCounter<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    pub fn new(capacity: usize) -> Self {
-        Self {
-            max_num: capacity,
-            current_num: 0,
-            _key_marker: PhantomData,
-            _value_marker: PhantomData,
-        }
-    }
-}
-
-impl<K, V> ResourceCounter for DefaultResourceCounter<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    type K = K;
-    type V = V;
-
-    fn consume(&mut self, _k: &Self::K, _v: &Self::V) {
-        self.current_num += 1;
-    }
-
-    fn restore(&mut self, _k: &Self::K, _v: &Self::V) {
-        self.current_num -= 1;
-    }
-
-    fn exceed_capacity(&self) -> bool {
-        self.current_num > self.max_num
-    }
-}
diff --git a/ballista/cache/src/backend/policy/mod.rs 
b/ballista/cache/src/backend/policy/mod.rs
deleted file mode 100644
index 26f17137..00000000
--- a/ballista/cache/src/backend/policy/mod.rs
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::any::Any;
-use std::fmt::Debug;
-use std::hash::Hash;
-
-pub mod lru;
-
-pub type CachePolicyPutResult<K, V> = (Option<V>, Vec<(K, V)>);
-
-pub trait CachePolicy: Debug + Send + 'static {
-    /// Cache key.
-    type K: Clone + Eq + Hash + Ord + Debug + Send + 'static;
-
-    /// Cached value.
-    type V: Clone + Debug + Send + 'static;
-
-    /// Get value for given key if it exists.
-    fn get(&mut self, k: &Self::K) -> Option<Self::V>;
-
-    /// Peek value for given key if it exists.
-    ///
-    /// In contrast to [`get`](Self::get) this will only return a value if 
there is a stored value.
-    /// This will not change the cache entries.
-    fn peek(&mut self, k: &Self::K) -> Option<Self::V>;
-
-    /// Put value for given key.
-    ///
-    /// If a key already exists, its old value will be returned.
-    ///
-    /// At the meanwhile, entries popped due to memory pressure will be 
returned
-    fn put(&mut self, k: Self::K, v: Self::V) -> CachePolicyPutResult<Self::K, 
Self::V>;
-
-    /// Remove value for given key.
-    ///
-    /// If a key does not exist, none will be returned.
-    fn remove(&mut self, k: &Self::K) -> Option<Self::V>;
-
-    /// Remove an entry from the cache due to memory pressure or expiration.
-    ///
-    /// If the cache is empty, none will be returned.
-    fn pop(&mut self) -> Option<(Self::K, Self::V)>;
-
-    /// Return backend as [`Any`] which can be used to downcast to a specific 
implementation.
-    fn as_any(&self) -> &dyn Any;
-}
diff --git a/ballista/cache/src/lib.rs b/ballista/cache/src/lib.rs
deleted file mode 100644
index 1cba039c..00000000
--- a/ballista/cache/src/lib.rs
+++ /dev/null
@@ -1,54 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::backend::policy::CachePolicy;
-use crate::backend::CacheBackend;
-use crate::listener::{
-    cache_policy::CachePolicyWithListener, 
loading_cache::LoadingCacheWithListener,
-};
-use crate::loading_cache::{driver::CacheDriver, loader::CacheLoader};
-use std::fmt::Debug;
-use std::hash::Hash;
-use std::sync::Arc;
-
-pub mod backend;
-pub mod listener;
-pub mod loading_cache;
-pub mod metrics;
-
-pub type DefaultLoadingCache<K, V, L> = 
LoadingCacheWithListener<CacheDriver<K, V, L>>;
-pub type LoadingCacheMetrics<K, V> = metrics::loading_cache::Metrics<K, V>;
-
-pub fn create_loading_cache_with_metrics<K, V, L>(
-    policy: impl CachePolicy<K = K, V = V>,
-    loader: Arc<L>,
-) -> (DefaultLoadingCache<K, V, L>, Arc<LoadingCacheMetrics<K, V>>)
-where
-    K: Clone + Eq + Hash + Debug + Ord + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-    L: CacheLoader<K = K, V = V>,
-{
-    let metrics = Arc::new(metrics::loading_cache::Metrics::new());
-
-    let policy_with_metrics = CachePolicyWithListener::new(policy, 
vec![metrics.clone()]);
-    let cache_backend = CacheBackend::new(policy_with_metrics);
-    let loading_cache = CacheDriver::new(cache_backend, loader);
-    let loading_cache_with_metrics =
-        LoadingCacheWithListener::new(loading_cache, vec![metrics.clone()]);
-
-    (loading_cache_with_metrics, metrics)
-}
diff --git a/ballista/cache/src/listener/cache_policy.rs 
b/ballista/cache/src/listener/cache_policy.rs
deleted file mode 100644
index 83917ea8..00000000
--- a/ballista/cache/src/listener/cache_policy.rs
+++ /dev/null
@@ -1,133 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::backend::policy::{CachePolicy, CachePolicyPutResult};
-use std::any::Any;
-use std::fmt::Debug;
-use std::hash::Hash;
-use std::sync::Arc;
-
-pub trait CachePolicyListener: Debug + Send + Sync + 'static {
-    /// Cache key.
-    type K: Clone + Eq + Hash + Debug + Ord + Send + 'static;
-
-    /// Cache value.
-    type V: Clone + Debug + Send + 'static;
-
-    fn listen_on_get(&self, k: Self::K, v: Option<Self::V>);
-
-    fn listen_on_peek(&self, k: Self::K, v: Option<Self::V>);
-
-    fn listen_on_put(&self, k: Self::K, v: Self::V, old_v: Option<Self::V>);
-
-    fn listen_on_remove(&self, k: Self::K, v: Option<Self::V>);
-
-    fn listen_on_pop(&self, entry: (Self::K, Self::V));
-}
-
-#[derive(Debug)]
-pub struct CachePolicyWithListener<P>
-where
-    P: CachePolicy,
-{
-    inner: P,
-    listeners: Vec<Arc<dyn CachePolicyListener<K = P::K, V = P::V>>>,
-}
-
-impl<P> CachePolicyWithListener<P>
-where
-    P: CachePolicy,
-{
-    pub fn new(
-        inner: P,
-        listeners: Vec<Arc<dyn CachePolicyListener<K = P::K, V = P::V>>>,
-    ) -> Self {
-        Self { inner, listeners }
-    }
-}
-
-impl<P> CachePolicy for CachePolicyWithListener<P>
-where
-    P: CachePolicy,
-{
-    type K = P::K;
-    type V = P::V;
-
-    fn get(&mut self, k: &Self::K) -> Option<Self::V> {
-        let v = self.inner.get(k);
-
-        // For listeners
-        self.listeners
-            .iter()
-            .for_each(|listener| listener.listen_on_get(k.clone(), 
v.as_ref().cloned()));
-
-        v
-    }
-
-    fn peek(&mut self, k: &Self::K) -> Option<Self::V> {
-        let v = self.inner.peek(k);
-
-        // For listeners
-        self.listeners
-            .iter()
-            .for_each(|listener| listener.listen_on_peek(k.clone(), 
v.as_ref().cloned()));
-
-        v
-    }
-
-    fn put(&mut self, k: Self::K, v: Self::V) -> CachePolicyPutResult<Self::K, 
Self::V> {
-        let ret = self.inner.put(k.clone(), v.clone());
-
-        // For listeners
-        self.listeners.iter().for_each(|listener| {
-            listener.listen_on_put(k.clone(), v.clone(), 
ret.0.as_ref().cloned());
-            ret.1
-                .iter()
-                .for_each(|entry| listener.listen_on_pop(entry.clone()));
-        });
-
-        ret
-    }
-
-    fn remove(&mut self, k: &Self::K) -> Option<Self::V> {
-        let v = self.inner.remove(k);
-
-        // For listeners
-        self.listeners.iter().for_each(|listener| {
-            listener.listen_on_remove(k.clone(), v.as_ref().cloned())
-        });
-
-        v
-    }
-
-    fn pop(&mut self) -> Option<(Self::K, Self::V)> {
-        let entry = self.inner.pop();
-
-        // For listeners
-        if let Some(entry) = &entry {
-            self.listeners
-                .iter()
-                .for_each(|listener| listener.listen_on_pop(entry.clone()));
-        }
-
-        entry
-    }
-
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-}
diff --git a/ballista/cache/src/listener/loading_cache.rs 
b/ballista/cache/src/listener/loading_cache.rs
deleted file mode 100644
index 09f61c3b..00000000
--- a/ballista/cache/src/listener/loading_cache.rs
+++ /dev/null
@@ -1,197 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::loading_cache::{CacheGetStatus, LoadingCache};
-use async_trait::async_trait;
-use std::fmt::Debug;
-use std::hash::Hash;
-use std::sync::Arc;
-
-pub trait LoadingCacheListener: Debug + Send + Sync + 'static {
-    /// Cache key.
-    type K: Clone + Eq + Hash + Debug + Ord + Send + 'static;
-
-    /// Cache value.
-    type V: Clone + Debug + Send + 'static;
-
-    fn listen_on_get_if_present(&self, k: Self::K, v: Option<Self::V>);
-
-    fn listen_on_get(&self, k: Self::K, v: Self::V, status: CacheGetStatus);
-
-    fn listen_on_put(&self, k: Self::K, v: Self::V);
-
-    fn listen_on_invalidate(&self, k: Self::K);
-
-    fn listen_on_get_cancelling(&self, k: Self::K);
-}
-
-#[derive(Debug)]
-pub struct LoadingCacheWithListener<L>
-where
-    L: LoadingCache,
-{
-    inner: L,
-    listeners: Vec<Arc<dyn LoadingCacheListener<K = L::K, V = L::V>>>,
-}
-
-impl<L> LoadingCacheWithListener<L>
-where
-    L: LoadingCache,
-{
-    pub fn new(
-        inner: L,
-        listeners: Vec<Arc<dyn LoadingCacheListener<K = L::K, V = L::V>>>,
-    ) -> Self {
-        Self { inner, listeners }
-    }
-}
-
-#[async_trait]
-impl<L> LoadingCache for LoadingCacheWithListener<L>
-where
-    L: LoadingCache,
-{
-    type K = L::K;
-    type V = L::V;
-    type GetExtra = L::GetExtra;
-
-    fn get_if_present(&self, k: Self::K) -> Option<Self::V> {
-        let v = self.inner.get_if_present(k.clone());
-        self.listen_on_get_if_present(k, v.as_ref().cloned());
-        v
-    }
-
-    async fn get_with_status(
-        &self,
-        k: Self::K,
-        extra: Self::GetExtra,
-    ) -> (Self::V, CacheGetStatus) {
-        let mut set_on_drop = SetGetListenerOnDrop::new(self, k.clone());
-        let (v, status) = self.inner.get_with_status(k, extra).await;
-        set_on_drop.get_result = Some((v.clone(), status));
-        (v, status)
-    }
-
-    async fn put(&self, k: Self::K, v: Self::V) {
-        let k_captured = k.clone();
-        let v_captured = v.clone();
-        self.inner.put(k_captured, v_captured).await;
-        self.listen_on_put(k, v);
-    }
-
-    fn invalidate(&self, k: Self::K) {
-        self.inner.invalidate(k.clone());
-        self.listen_on_invalidate(k);
-    }
-}
-
-struct SetGetListenerOnDrop<'a, L>
-where
-    L: LoadingCache,
-{
-    listener: &'a LoadingCacheWithListener<L>,
-    key: L::K,
-    get_result: Option<(L::V, CacheGetStatus)>,
-}
-
-impl<'a, L> SetGetListenerOnDrop<'a, L>
-where
-    L: LoadingCache,
-{
-    fn new(listener: &'a LoadingCacheWithListener<L>, key: L::K) -> Self {
-        Self {
-            listener,
-            key,
-            get_result: None,
-        }
-    }
-}
-
-impl<'a, L> Drop for SetGetListenerOnDrop<'a, L>
-where
-    L: LoadingCache,
-{
-    fn drop(&mut self) {
-        if let Some((value, status)) = &self.get_result {
-            self.listener
-                .listen_on_get(self.key.clone(), value.clone(), *status)
-        } else {
-            self.listener.listen_on_get_cancelling(self.key.clone());
-        }
-    }
-}
-
-impl<L> LoadingCacheListener for LoadingCacheWithListener<L>
-where
-    L: LoadingCache,
-{
-    type K = L::K;
-    type V = L::V;
-
-    fn listen_on_get_if_present(&self, k: Self::K, v: Option<Self::V>) {
-        if self.listeners.len() == 1 {
-            self.listeners
-                .first()
-                .unwrap()
-                .listen_on_get_if_present(k, v);
-        } else {
-            self.listeners.iter().for_each(|listener| {
-                listener.listen_on_get_if_present(k.clone(), 
v.as_ref().cloned())
-            });
-        }
-    }
-
-    fn listen_on_get(&self, k: Self::K, v: Self::V, status: CacheGetStatus) {
-        if self.listeners.len() == 1 {
-            self.listeners.first().unwrap().listen_on_get(k, v, status);
-        } else {
-            self.listeners.iter().for_each(|listener| {
-                listener.listen_on_get(k.clone(), v.clone(), status)
-            });
-        }
-    }
-
-    fn listen_on_put(&self, k: Self::K, v: Self::V) {
-        if self.listeners.len() == 1 {
-            self.listeners.first().unwrap().listen_on_put(k, v);
-        } else {
-            self.listeners
-                .iter()
-                .for_each(|listener| listener.listen_on_put(k.clone(), 
v.clone()));
-        }
-    }
-
-    fn listen_on_invalidate(&self, k: Self::K) {
-        if self.listeners.len() == 1 {
-            self.listeners.first().unwrap().listen_on_invalidate(k);
-        } else {
-            self.listeners
-                .iter()
-                .for_each(|listener| listener.listen_on_invalidate(k.clone()));
-        }
-    }
-
-    fn listen_on_get_cancelling(&self, k: Self::K) {
-        if self.listeners.len() == 1 {
-            self.listeners.first().unwrap().listen_on_get_cancelling(k);
-        } else {
-            self.listeners
-                .iter()
-                .for_each(|listener| 
listener.listen_on_get_cancelling(k.clone()));
-        }
-    }
-}
diff --git a/ballista/cache/src/listener/mod.rs 
b/ballista/cache/src/listener/mod.rs
deleted file mode 100644
index 670362e1..00000000
--- a/ballista/cache/src/listener/mod.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod cache_policy;
-pub mod loading_cache;
diff --git a/ballista/cache/src/loading_cache/cancellation_safe_future.rs 
b/ballista/cache/src/loading_cache/cancellation_safe_future.rs
deleted file mode 100644
index 73017034..00000000
--- a/ballista/cache/src/loading_cache/cancellation_safe_future.rs
+++ /dev/null
@@ -1,179 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::{
-    future::Future,
-    pin::Pin,
-    sync::Arc,
-    task::{Context, Poll},
-};
-
-use futures::future::BoxFuture;
-use parking_lot::Mutex;
-use tokio::task::JoinHandle;
-
-/// Wrapper around a future that cannot be cancelled.
-///
-/// When the future is dropped/cancelled, we'll spawn a tokio task to _rescue_ 
it.
-pub struct CancellationSafeFuture<F>
-where
-    F: Future + Send + 'static,
-    F::Output: Send,
-{
-    /// Mark if the inner future finished. If not, we must spawn a helper task 
on drop.
-    done: bool,
-
-    /// Inner future.
-    ///
-    /// Wrapped in an `Option` so we can extract it during drop. Inside that 
option however we also need a pinned
-    /// box because once this wrapper is polled, it will be pinned in memory 
-- even during drop. Now the inner
-    /// future does not necessarily implement `Unpin`, so we need a heap 
allocation to pin it in memory even when we
-    /// move it out of this option.
-    inner: Option<BoxFuture<'static, F::Output>>,
-
-    /// Where to store the join handle on drop.
-    receiver: Arc<Mutex<Option<JoinHandle<F::Output>>>>,
-}
-
-impl<F> Drop for CancellationSafeFuture<F>
-where
-    F: Future + Send + 'static,
-    F::Output: Send,
-{
-    fn drop(&mut self) {
-        if !self.done {
-            // acquire lock BEFORE checking the Arc
-            let mut receiver = self.receiver.lock();
-            assert!(receiver.is_none());
-
-            // The Mutex is owned by the Arc and cannot be moved out of it. So 
after we acquired the lock we can safely
-            // check if any external party still has access to the receiver 
state. If not, we assume there is no
-            // interest in this future at all (e.g. during shutdown) and will 
NOT spawn it.
-            if Arc::strong_count(&self.receiver) > 1 {
-                let inner = self.inner.take().expect("Double-drop?");
-                let handle = tokio::task::spawn(inner);
-                *receiver = Some(handle);
-            }
-        }
-    }
-}
-
-impl<F> CancellationSafeFuture<F>
-where
-    F: Future + Send,
-    F::Output: Send,
-{
-    /// Create new future that is protected from cancellation.
-    ///
-    /// If [`CancellationSafeFuture`] is cancelled (i.e. dropped) and there is 
still some external receiver of the state
-    /// left, than we will drive the payload (`f`) to completion. Otherwise 
`f` will be cancelled.
-    pub fn new(fut: F, receiver: Arc<Mutex<Option<JoinHandle<F::Output>>>>) -> 
Self {
-        Self {
-            done: false,
-            inner: Some(Box::pin(fut)),
-            receiver,
-        }
-    }
-}
-
-impl<F> Future for CancellationSafeFuture<F>
-where
-    F: Future + Send,
-    F::Output: Send,
-{
-    type Output = F::Output;
-
-    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Self::Output> {
-        assert!(!self.done, "Polling future that already returned");
-
-        match self.inner.as_mut().expect("not dropped").as_mut().poll(cx) {
-            Poll::Ready(res) => {
-                self.done = true;
-                Poll::Ready(res)
-            }
-            Poll::Pending => Poll::Pending,
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use std::{
-        sync::atomic::{AtomicBool, Ordering},
-        time::Duration,
-    };
-
-    use tokio::sync::Barrier;
-
-    use super::*;
-
-    #[tokio::test]
-    async fn test_happy_path() {
-        let done = Arc::new(AtomicBool::new(false));
-        let done_captured = Arc::clone(&done);
-
-        let receiver = Default::default();
-        let fut = CancellationSafeFuture::new(
-            async move {
-                done_captured.store(true, Ordering::SeqCst);
-            },
-            receiver,
-        );
-
-        fut.await;
-
-        assert!(done.load(Ordering::SeqCst));
-    }
-
-    #[tokio::test]
-    async fn test_cancel_future() {
-        let done = Arc::new(Barrier::new(2));
-        let done_captured = Arc::clone(&done);
-
-        let receiver = Default::default();
-        let fut = CancellationSafeFuture::new(
-            async move {
-                done_captured.wait().await;
-            },
-            Arc::clone(&receiver),
-        );
-
-        drop(fut);
-
-        tokio::time::timeout(Duration::from_secs(5), done.wait())
-            .await
-            .unwrap();
-    }
-
-    #[tokio::test]
-    async fn test_receiver_gone() {
-        let done = Arc::new(Barrier::new(2));
-        let done_captured = Arc::clone(&done);
-
-        let receiver = Default::default();
-        let fut = CancellationSafeFuture::new(
-            async move {
-                done_captured.wait().await;
-            },
-            receiver,
-        );
-
-        drop(fut);
-
-        assert_eq!(Arc::strong_count(&done), 1);
-    }
-}
diff --git a/ballista/cache/src/loading_cache/driver.rs 
b/ballista/cache/src/loading_cache/driver.rs
deleted file mode 100644
index 614c6ead..00000000
--- a/ballista/cache/src/loading_cache/driver.rs
+++ /dev/null
@@ -1,573 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Main data structure, see [`CacheDriver`].
-
-use crate::backend::CacheBackend;
-use crate::loading_cache::{
-    cancellation_safe_future::CancellationSafeFuture,
-    loader::CacheLoader,
-    {CacheGetStatus, LoadingCache},
-};
-use async_trait::async_trait;
-use futures::future::{BoxFuture, Shared};
-use futures::{FutureExt, TryFutureExt};
-use log::debug;
-use parking_lot::Mutex;
-use std::collections::HashMap;
-use std::fmt::Debug;
-use std::future::Future;
-use std::hash::Hash;
-use std::sync::Arc;
-use tokio::{
-    sync::oneshot::{error::RecvError, Sender},
-    task::JoinHandle,
-};
-
-/// Combine a [`CacheBackend`] and a [`Loader`] into a single [`Cache`]
-#[derive(Debug)]
-pub struct CacheDriver<K, V, L>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-    L: CacheLoader<K = K, V = V>,
-{
-    state: Arc<Mutex<CacheState<K, V>>>,
-    loader: Arc<L>,
-}
-
-impl<K, V, L> CacheDriver<K, V, L>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-    L: CacheLoader<K = K, V = V>,
-{
-    /// Create new, empty cache with given loader function.
-    pub fn new(backend: CacheBackend<K, V>, loader: Arc<L>) -> Self {
-        Self {
-            state: Arc::new(Mutex::new(CacheState {
-                cached_entries: backend,
-                loaders: HashMap::new(),
-                next_loader_tag: 0,
-            })),
-            loader,
-        }
-    }
-}
-
-#[async_trait]
-impl<K, V, L> LoadingCache for CacheDriver<K, V, L>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-    L: CacheLoader<K = K, V = V>,
-{
-    type K = K;
-    type V = V;
-    type GetExtra = L::Extra;
-
-    fn get_if_present(&self, k: Self::K) -> Option<Self::V> {
-        self.state.lock().cached_entries.get(&k)
-    }
-
-    async fn get_with_status(
-        &self,
-        k: Self::K,
-        extra: Self::GetExtra,
-    ) -> (Self::V, CacheGetStatus) {
-        // place state locking into its own scope so it doesn't leak into the 
generator (async
-        // function)
-        let (fut, receiver, status) = {
-            let mut state = self.state.lock();
-
-            // check if the entry has already been cached
-            if let Some(v) = state.cached_entries.get(&k) {
-                return (v, CacheGetStatus::Hit);
-            }
-
-            // check if there is already a running loader for this key
-            if let Some(loader) = state.loaders.get(&k) {
-                (
-                    None,
-                    loader.recv.clone(),
-                    CacheGetStatus::MissAlreadyLoading,
-                )
-            } else {
-                // generate unique tag
-                let loader_tag = state.next_loader_tag();
-
-                // requires new loader
-                let (fut, loader) = create_value_loader(
-                    self.state.clone(),
-                    self.loader.clone(),
-                    loader_tag,
-                    k.clone(),
-                    extra,
-                );
-
-                let receiver = loader.recv.clone();
-                state.loaders.insert(k, loader);
-
-                (Some(fut), receiver, CacheGetStatus::Miss)
-            }
-        };
-
-        // try to run the loader future in this very task context to avoid 
spawning tokio tasks (which adds latency and
-        // overhead)
-        if let Some(fut) = fut {
-            fut.await;
-        }
-
-        let v = retrieve_from_shared(receiver).await;
-
-        (v, status)
-    }
-
-    async fn put(&self, k: Self::K, v: Self::V) {
-        let maybe_join_handle = {
-            let mut state = self.state.lock();
-
-            let maybe_recv = if let Some(loader) = state.loaders.remove(&k) {
-                // it's OK when the receiver side is gone (likely panicked)
-                loader.set.send(v.clone()).ok();
-
-                // When we side-load data into the running task, the task does 
NOT modify the
-                // backend, so we have to do that. The reason for not letting 
the task feed the
-                // side-loaded data back into `cached_entries` is that we 
would need to drop the
-                // state lock here before the task could acquire it, leading 
to a lock gap.
-                Some(loader.recv)
-            } else {
-                None
-            };
-
-            state.cached_entries.put(k, v);
-
-            maybe_recv
-        };
-
-        // drive running loader (if any) to completion
-        if let Some(recv) = maybe_join_handle {
-            // we do not care if the loader died (e.g. due to a panic)
-            recv.await.ok();
-        }
-    }
-
-    fn invalidate(&self, k: Self::K) {
-        let mut state = self.state.lock();
-
-        if state.loaders.remove(&k).is_some() {
-            debug!("Running loader for key {:?} is removed", k);
-        }
-
-        state.cached_entries.remove(&k);
-    }
-}
-
-impl<K, V, L> Drop for CacheDriver<K, V, L>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-    L: CacheLoader<K = K, V = V>,
-{
-    fn drop(&mut self) {
-        for (_k, loader) in self.state.lock().loaders.drain() {
-            // It's unlikely that anyone is still using the shared receiver at 
this point, because
-            // `Cache::get` borrows the `self`. If it is still in use, 
aborting the task will
-            // cancel the contained future which in turn will drop the sender 
of the oneshot
-            // channel. The receivers will be notified.
-            let handle = loader.join_handle.lock();
-            if let Some(handle) = handle.as_ref() {
-                handle.abort();
-            }
-        }
-    }
-}
-
-fn create_value_loader<K, V, Extra>(
-    state: Arc<Mutex<CacheState<K, V>>>,
-    loader: Arc<dyn CacheLoader<K = K, V = V, Extra = Extra>>,
-    loader_tag: u64,
-    k: K,
-    extra: Extra,
-) -> (
-    CancellationSafeFuture<impl Future<Output = ()>>,
-    ValueLoader<V>,
-)
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-    Extra: Debug + Send + 'static,
-{
-    let (tx_main, rx_main) = tokio::sync::oneshot::channel();
-    let receiver = rx_main
-        .map_ok(|v| Arc::new(Mutex::new(v)))
-        .map_err(Arc::new)
-        .boxed()
-        .shared();
-    let (tx_set, rx_set) = tokio::sync::oneshot::channel();
-
-    // need to wrap the loader into a `CancellationSafeFuture` so that it 
doesn't get cancelled when
-    // this very request is cancelled
-    let join_handle_receiver = Arc::new(Mutex::new(None));
-    let fut = async move {
-        let loader_fut = async move {
-            let mut submitter = ResultSubmitter::new(state, k.clone(), 
loader_tag);
-
-            // execute the loader
-            // If we panic here then `tx` will be dropped and the receivers 
will be
-            // notified.
-            let v = loader.load(k, extra).await;
-
-            // remove "running" state and store result
-            let was_running = submitter.submit(v.clone());
-
-            if !was_running {
-                // value was side-loaded, so we cannot populate `v`. Instead 
block this
-                // execution branch and wait for `rx_set` to deliver the 
side-loaded
-                // result.
-                loop {
-                    tokio::task::yield_now().await;
-                }
-            }
-
-            v
-        };
-
-        // prefer the side-loader
-        let v = futures::select_biased! {
-            maybe_v = rx_set.fuse() => {
-                match maybe_v {
-                    Ok(v) => {
-                        // data get side-loaded via `Cache::set`. In this 
case, we do
-                        // NOT modify the state because there would be a 
lock-gap. The
-                        // `set` function will do that for us instead.
-                        v
-                    }
-                    Err(_) => {
-                        // sender side is gone, very likely the cache is 
shutting down
-                        debug!(
-                            "Sender for side-loading data into running loader 
gone.",
-                        );
-                        return;
-                    }
-                }
-            }
-            v = loader_fut.fuse() => v,
-        };
-
-        // broadcast result
-        // It's OK if the receiver side is gone. This might happen during 
shutdown
-        tx_main.send(v).ok();
-    };
-    let fut = CancellationSafeFuture::new(fut, 
Arc::clone(&join_handle_receiver));
-
-    (
-        fut,
-        ValueLoader {
-            recv: receiver,
-            set: tx_set,
-            join_handle: join_handle_receiver,
-            tag: loader_tag,
-        },
-    )
-}
-
-/// Inner cache state that is usually guarded by a lock.
-///
-/// The state parts must be updated in a consistent manner, i.e. while using 
the same lock guard.
-#[derive(Debug)]
-struct CacheState<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    /// Cached entries (i.e. queries completed).
-    cached_entries: CacheBackend<K, V>,
-
-    /// Currently value loaders indexed by cache key.
-    loaders: HashMap<K, ValueLoader<V>>,
-
-    /// Tag used for the next value loader to distinguish loaders for the same 
key
-    /// (e.g. when starting, side-loading, starting again)
-    next_loader_tag: u64,
-}
-
-impl<K, V> CacheState<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    /// To avoid overflow issue, it will begin from 0. It will rarely happen 
that
-    /// two value loaders share the same key and tag while for different 
purposes
-    #[inline]
-    fn next_loader_tag(&mut self) -> u64 {
-        let ret = self.next_loader_tag;
-        if self.next_loader_tag != u64::MAX {
-            self.next_loader_tag += 1;
-        } else {
-            self.next_loader_tag = 0;
-        }
-        ret
-    }
-}
-
-/// State for coordinating the execution of a single value loader.
-#[derive(Debug)]
-struct ValueLoader<V> {
-    /// A receiver that can await the result.
-    recv: SharedReceiver<V>,
-
-    /// A sender that enables setting entries while the query is running.
-    set: Sender<V>,
-
-    /// A handle for the task that is currently loading the value.
-    ///
-    /// The handle can be used to abort the loading, e.g. when dropping the 
cache.
-    join_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
-
-    /// Tag so that loaders for the same key (e.g. when starting, 
side-loading, starting again) can
-    /// be told apart.
-    tag: u64,
-}
-
-/// A [`tokio::sync::oneshot::Receiver`] that can be cloned.
-///
-/// The types are:
-///
-/// - `Arc<Mutex<V>>`: Ensures that we can clone `V` without requiring `V: 
Sync`. At the same time
-///   the reference to `V` (i.e. the `Arc`) must be cloneable for `Shared`
-/// - `Arc<RecvError>`: Is required because `RecvError` is not `Clone` but 
`Shared` requires that.
-/// - `BoxFuture`: The transformation from `Result<V, RecvError>` to 
`Result<Arc<Mutex<V>>,
-///   Arc<RecvError>>` results in a kinda messy type and we wanna erase that.
-/// - `Shared`: Allow the receiver to be cloned and be awaited from multiple 
places.
-type SharedReceiver<V> =
-    Shared<BoxFuture<'static, Result<Arc<Mutex<V>>, Arc<RecvError>>>>;
-
-/// Retrieve data from shared receiver.
-async fn retrieve_from_shared<V>(receiver: SharedReceiver<V>) -> V
-where
-    V: Clone + Send,
-{
-    receiver
-        .await
-        .expect("cache loader panicked, see logs")
-        .lock()
-        .clone()
-}
-
-/// Helper to submit results of running queries.
-///
-/// Ensures that running loader is removed when dropped (e.g. during panic).
-struct ResultSubmitter<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    state: Arc<Mutex<CacheState<K, V>>>,
-    tag: u64,
-    k: Option<K>,
-    v: Option<V>,
-}
-
-impl<K, V> ResultSubmitter<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    fn new(state: Arc<Mutex<CacheState<K, V>>>, k: K, tag: u64) -> Self {
-        Self {
-            state,
-            tag,
-            k: Some(k),
-            v: None,
-        }
-    }
-
-    /// Submit value.
-    ///
-    /// Returns `true` if this very loader was running.
-    fn submit(&mut self, v: V) -> bool {
-        assert!(self.v.is_none());
-        self.v = Some(v);
-        self.finalize()
-    }
-
-    /// Finalize request.
-    ///
-    /// Returns `true` if this very loader was running.
-    fn finalize(&mut self) -> bool {
-        let k = self.k.take().expect("finalized twice");
-        let mut state = self.state.lock();
-
-        match state.loaders.get(&k) {
-            Some(loader) if loader.tag == self.tag => {
-                state.loaders.remove(&k);
-
-                if let Some(v) = self.v.take() {
-                    // this very loader is in charge of the key, so store in 
in the
-                    // underlying cache
-                    state.cached_entries.put(k, v);
-                }
-
-                true
-            }
-            _ => {
-                // This loader is actually not really running any longer but 
got
-                // shut down, e.g. due to side loading. Do NOT store the
-                // generated value in the underlying cache.
-
-                false
-            }
-        }
-    }
-}
-
-impl<K, V> Drop for ResultSubmitter<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    fn drop(&mut self) {
-        if self.k.is_some() {
-            // not finalized yet
-            self.finalize();
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-
-    use crate::backend::policy::lru::lru_cache::LruCache;
-    use crate::listener::cache_policy::CachePolicyListener;
-    use crate::{CacheBackend, CacheDriver, CacheLoader, 
CachePolicyWithListener};
-
-    use crate::backend::policy::lru::DefaultResourceCounter;
-    use crate::loading_cache::LoadingCache;
-    use async_trait::async_trait;
-    use parking_lot::Mutex;
-    use std::sync::mpsc::{channel, Sender};
-    use std::sync::Arc;
-
-    #[tokio::test]
-    async fn test_removal_entries() {
-        let cache_policy =
-            LruCache::with_resource_counter(DefaultResourceCounter::new(3));
-        let loader = TestStringCacheLoader {
-            prefix: "file".to_string(),
-        };
-        let (sender, receiver) = channel::<(String, String)>();
-        let listener = Arc::new(EntryRemovalListener::new(sender));
-        let policy_with_listener =
-            CachePolicyWithListener::new(cache_policy, vec![listener.clone()]);
-        let cache_backend = CacheBackend::new(policy_with_listener);
-        let loading_cache = CacheDriver::new(cache_backend, Arc::new(loader));
-
-        assert_eq!(
-            "file1".to_string(),
-            loading_cache.get("1".to_string(), ()).await
-        );
-        assert_eq!(
-            "file2".to_string(),
-            loading_cache.get("2".to_string(), ()).await
-        );
-        assert_eq!(
-            "file3".to_string(),
-            loading_cache.get("3".to_string(), ()).await
-        );
-        assert_eq!(
-            "file4".to_string(),
-            loading_cache.get("4".to_string(), ()).await
-        );
-        assert_eq!(Ok(("1".to_string(), "file1".to_string())), 
receiver.recv());
-        assert!(loading_cache.get_if_present("1".to_string()).is_none());
-
-        loading_cache
-            .put("2".to_string(), "file2-bak".to_string())
-            .await;
-        assert_eq!(
-            "file5".to_string(),
-            loading_cache.get("5".to_string(), ()).await
-        );
-        assert_eq!(Ok(("3".to_string(), "file3".to_string())), 
receiver.recv());
-        assert!(loading_cache.get_if_present("3".to_string()).is_none());
-        assert!(loading_cache.get_if_present("2".to_string()).is_some());
-
-        loading_cache.invalidate("2".to_string());
-        assert_eq!(
-            Ok(("2".to_string(), "file2-bak".to_string())),
-            receiver.recv()
-        );
-        assert!(loading_cache.get_if_present("2".to_string()).is_none());
-    }
-
-    #[derive(Debug)]
-    struct EntryRemovalListener {
-        sender: Arc<Mutex<Sender<(String, String)>>>,
-    }
-
-    impl EntryRemovalListener {
-        pub fn new(sender: Sender<(String, String)>) -> Self {
-            Self {
-                sender: Arc::new(Mutex::new(sender)),
-            }
-        }
-    }
-
-    impl CachePolicyListener for EntryRemovalListener {
-        type K = String;
-        type V = String;
-
-        fn listen_on_get(&self, _k: Self::K, _v: Option<Self::V>) {
-            // Do nothing
-        }
-
-        fn listen_on_peek(&self, _k: Self::K, _v: Option<Self::V>) {
-            // Do nothing
-        }
-
-        fn listen_on_put(&self, _k: Self::K, _v: Self::V, _old_v: 
Option<Self::V>) {
-            // Do nothing
-        }
-
-        fn listen_on_remove(&self, k: Self::K, v: Option<Self::V>) {
-            if let Some(v) = v {
-                self.sender.lock().send((k, v)).unwrap();
-            }
-        }
-
-        fn listen_on_pop(&self, entry: (Self::K, Self::V)) {
-            self.sender.lock().send(entry).unwrap();
-        }
-    }
-
-    #[derive(Debug)]
-    struct TestStringCacheLoader {
-        prefix: String,
-    }
-
-    #[async_trait]
-    impl CacheLoader for TestStringCacheLoader {
-        type K = String;
-        type V = String;
-        type Extra = ();
-
-        async fn load(&self, k: Self::K, _extra: Self::Extra) -> Self::V {
-            format!("{}{k}", self.prefix)
-        }
-    }
-}
diff --git a/ballista/cache/src/loading_cache/loader.rs 
b/ballista/cache/src/loading_cache/loader.rs
deleted file mode 100644
index 8987d88b..00000000
--- a/ballista/cache/src/loading_cache/loader.rs
+++ /dev/null
@@ -1,52 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use async_trait::async_trait;
-use std::fmt::Debug;
-use std::hash::Hash;
-
-/// Loader for missing [`Cache`](crate::cache::Cache) entries.
-#[async_trait]
-pub trait CacheLoader: Debug + Send + Sync + 'static {
-    /// Cache key.
-    type K: Debug + Hash + Send + 'static;
-
-    /// Cache value.
-    type V: Debug + Send + 'static;
-
-    /// Extra data needed when loading a missing entry. Specify `()` if not 
needed.
-    type Extra: Debug + Send + 'static;
-
-    /// Load value for given key, using the extra data if needed.
-    async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V;
-}
-
-#[async_trait]
-impl<K, V, Extra> CacheLoader for Box<dyn CacheLoader<K = K, V = V, Extra = 
Extra>>
-where
-    K: Debug + Hash + Send + 'static,
-    V: Debug + Send + 'static,
-    Extra: Debug + Send + 'static,
-{
-    type K = K;
-    type V = V;
-    type Extra = Extra;
-
-    async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V {
-        self.as_ref().load(k, extra).await
-    }
-}
diff --git a/ballista/cache/src/loading_cache/mod.rs 
b/ballista/cache/src/loading_cache/mod.rs
deleted file mode 100644
index bf6f0360..00000000
--- a/ballista/cache/src/loading_cache/mod.rs
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-/// It's a fork version from the 
influxdb_iox::cache_system[https://github.com/influxdata/influxdb_iox].
-/// Later we will propose to influxdb team to make this cache part more 
general.
-mod cancellation_safe_future;
-pub mod driver;
-pub mod loader;
-
-use async_trait::async_trait;
-use std::fmt::Debug;
-use std::hash::Hash;
-
-/// High-level loading cache interface.
-///
-/// Cache entries are manually added using get(Key, GetExtra) or put(Key, 
Value),
-/// and are stored in the cache until either evicted or manually invalidated.
-///
-/// # Concurrency
-///
-/// Multiple cache requests for different keys can run at the same time. When 
data is requested for
-/// the same key, the underlying loader will only be polled once, even when 
the requests are made
-/// while the loader is still running.
-///
-/// # Cancellation
-///
-/// Canceling a [`get`](Self::get) request will NOT cancel the underlying 
loader. The data will
-/// still be cached.
-#[async_trait]
-pub trait LoadingCache: Debug + Send + Sync + 'static {
-    /// Cache key.
-    type K: Clone + Eq + Hash + Debug + Ord + Send + 'static;
-
-    /// Cache value.
-    type V: Clone + Debug + Send + 'static;
-
-    /// Extra data that is provided during [`GET`](Self::get) but that is NOT 
part of the cache key.
-    type GetExtra: Debug + Send + 'static;
-
-    /// Get value from cache.
-    ///
-    /// In contrast to [`get`](Self::get) this will only return a value if 
there is a stored value.
-    /// This will NOT start a new loading task.
-    fn get_if_present(&self, k: Self::K) -> Option<Self::V>;
-
-    /// Get value from cache.
-    ///
-    /// Note that `extra` is only used if the key is missing from the storage 
backend
-    /// and no value loader for this key is running yet.
-    async fn get(&self, k: Self::K, extra: Self::GetExtra) -> Self::V {
-        self.get_with_status(k, extra).await.0
-    }
-
-    /// Get value from cache and the [status](CacheGetStatus).
-    ///
-    /// Note that `extra` is only used if the key is missing from the storage 
backend
-    /// and no value loader for this key is running yet.
-    async fn get_with_status(
-        &self,
-        k: Self::K,
-        extra: Self::GetExtra,
-    ) -> (Self::V, CacheGetStatus);
-
-    /// Side-load an entry into the cache. If the cache previously contained a 
value associated with key,
-    /// the old value is replaced by value.
-    ///
-    /// This will also complete a currently running loader for this key.
-    async fn put(&self, k: Self::K, v: Self::V);
-
-    /// Discard any cached value for the key.
-    ///
-    /// This will also interrupt a currently running loader for this key.
-    fn invalidate(&self, k: Self::K);
-}
-
-/// Status of a [`Cache`] [GET](LoadingCache::get_with_status) request.
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum CacheGetStatus {
-    /// The requested entry was present in the storage backend.
-    Hit,
-
-    /// The requested entry was NOT present in the storage backend and there's 
no running value loader.
-    Miss,
-
-    /// The requested entry was NOT present in the storage backend, but there 
was already a running value loader for
-    /// this particular key.
-    MissAlreadyLoading,
-}
-
-impl CacheGetStatus {
-    /// Get human and machine readable name.
-    pub fn name(&self) -> &'static str {
-        match self {
-            Self::Hit => "hit",
-            Self::Miss => "miss",
-            Self::MissAlreadyLoading => "miss_already_loading",
-        }
-    }
-}
diff --git a/ballista/cache/src/metrics/loading_cache.rs 
b/ballista/cache/src/metrics/loading_cache.rs
deleted file mode 100644
index 92a2e4fc..00000000
--- a/ballista/cache/src/metrics/loading_cache.rs
+++ /dev/null
@@ -1,292 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::loading_cache::CacheGetStatus;
-use std::fmt::Debug;
-use std::hash::Hash;
-use std::marker::PhantomData;
-
-use crate::listener::cache_policy::CachePolicyListener;
-use crate::listener::loading_cache::LoadingCacheListener;
-use std::sync::atomic::{AtomicU64, Ordering};
-use std::sync::Arc;
-
-/// Struct containing all the metrics
-#[derive(Debug)]
-pub struct Metrics<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    get_hit_count: U64Counter,
-    get_miss_count: U64Counter,
-    get_miss_already_loading_count: U64Counter,
-    get_cancelled_count: U64Counter,
-    put_count: U64Counter,
-    eviction_count: U64Counter,
-    _key_marker: PhantomData<K>,
-    _value_marker: PhantomData<V>,
-}
-
-impl<K, V> Default for Metrics<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl<K, V> Metrics<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    pub fn new() -> Self {
-        Self {
-            get_hit_count: Default::default(),
-            get_miss_count: Default::default(),
-            get_miss_already_loading_count: Default::default(),
-            get_cancelled_count: Default::default(),
-            put_count: Default::default(),
-            eviction_count: Default::default(),
-            _key_marker: Default::default(),
-            _value_marker: Default::default(),
-        }
-    }
-
-    pub fn get_hit_count(&self) -> u64 {
-        self.get_hit_count.fetch()
-    }
-
-    pub fn get_miss_count(&self) -> u64 {
-        self.get_miss_count.fetch()
-    }
-
-    pub fn get_miss_already_loading_count(&self) -> u64 {
-        self.get_miss_already_loading_count.fetch()
-    }
-
-    pub fn get_cancelled_count(&self) -> u64 {
-        self.get_cancelled_count.fetch()
-    }
-
-    pub fn put_count(&self) -> u64 {
-        self.put_count.fetch()
-    }
-
-    pub fn eviction_count(&self) -> u64 {
-        self.eviction_count.fetch()
-    }
-}
-
-// Since we don't store K and V directly, it will be safe.
-unsafe impl<K, V> Sync for Metrics<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-}
-
-impl<K, V> LoadingCacheListener for Metrics<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    type K = K;
-    type V = V;
-
-    fn listen_on_get_if_present(&self, _k: Self::K, v: Option<Self::V>) {
-        if v.is_some() {
-            &self.get_hit_count
-        } else {
-            &self.get_miss_count
-        }
-        .inc(1);
-    }
-
-    fn listen_on_get(&self, _k: Self::K, _v: Self::V, status: CacheGetStatus) {
-        match status {
-            CacheGetStatus::Hit => &self.get_hit_count,
-
-            CacheGetStatus::Miss => &self.get_miss_count,
-
-            CacheGetStatus::MissAlreadyLoading => 
&self.get_miss_already_loading_count,
-        }
-        .inc(1);
-    }
-
-    fn listen_on_put(&self, _k: Self::K, _v: Self::V) {
-        // Do nothing
-    }
-
-    fn listen_on_invalidate(&self, _k: Self::K) {
-        // Do nothing
-    }
-
-    fn listen_on_get_cancelling(&self, _k: Self::K) {
-        self.get_cancelled_count.inc(1);
-    }
-}
-
-impl<K, V> CachePolicyListener for Metrics<K, V>
-where
-    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
-    V: Clone + Debug + Send + 'static,
-{
-    type K = K;
-    type V = V;
-
-    fn listen_on_get(&self, _k: Self::K, _v: Option<Self::V>) {
-        // Do nothing
-    }
-
-    fn listen_on_peek(&self, _k: Self::K, _v: Option<Self::V>) {
-        // Do nothing
-    }
-
-    fn listen_on_put(&self, _k: Self::K, _v: Self::V, _old_v: Option<Self::V>) 
{
-        self.put_count.inc(1);
-    }
-
-    fn listen_on_remove(&self, _k: Self::K, _v: Option<Self::V>) {
-        self.eviction_count.inc(1);
-    }
-
-    fn listen_on_pop(&self, _entry: (Self::K, Self::V)) {
-        self.eviction_count.inc(1);
-    }
-}
-
-/// A monotonic counter
-#[derive(Debug, Clone, Default)]
-pub struct U64Counter {
-    counter: Arc<AtomicU64>,
-}
-
-impl U64Counter {
-    pub fn inc(&self, count: u64) {
-        self.counter.fetch_add(count, Ordering::Relaxed);
-    }
-
-    pub fn fetch(&self) -> u64 {
-        self.counter.load(Ordering::Relaxed)
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use crate::backend::policy::lru::lru_cache::LruCache;
-    use crate::backend::policy::lru::DefaultResourceCounter;
-    use crate::create_loading_cache_with_metrics;
-    use crate::loading_cache::loader::CacheLoader;
-    use crate::loading_cache::LoadingCache;
-    use async_trait::async_trait;
-    use std::sync::Arc;
-
-    #[tokio::test]
-    async fn test_metrics() {
-        let cache_policy =
-            LruCache::with_resource_counter(DefaultResourceCounter::new(3));
-        let loader = TestStringCacheLoader {
-            prefix: "file".to_string(),
-        };
-        let (loading_cache, metrics) =
-            create_loading_cache_with_metrics(cache_policy, Arc::new(loader));
-
-        assert_eq!(
-            "file1".to_string(),
-            loading_cache.get("1".to_string(), ()).await
-        );
-        assert_eq!(
-            "file2".to_string(),
-            loading_cache.get("2".to_string(), ()).await
-        );
-        assert_eq!(
-            "file3".to_string(),
-            loading_cache.get("3".to_string(), ()).await
-        );
-        assert_eq!(3, metrics.get_miss_count());
-
-        assert_eq!(
-            "file4".to_string(),
-            loading_cache.get("4".to_string(), ()).await
-        );
-        assert_eq!(0, metrics.get_hit_count());
-        assert_eq!(4, metrics.get_miss_count());
-        assert_eq!(4, metrics.put_count());
-        assert_eq!(1, metrics.eviction_count());
-
-        assert!(loading_cache.get_if_present("1".to_string()).is_none());
-        assert_eq!(0, metrics.get_hit_count());
-        assert_eq!(5, metrics.get_miss_count());
-        assert_eq!(4, metrics.put_count());
-        assert_eq!(1, metrics.eviction_count());
-
-        loading_cache
-            .put("2".to_string(), "file2-bak".to_string())
-            .await;
-        assert_eq!(0, metrics.get_hit_count());
-        assert_eq!(5, metrics.get_miss_count());
-        assert_eq!(5, metrics.put_count());
-        assert_eq!(1, metrics.eviction_count());
-
-        assert_eq!(
-            "file5".to_string(),
-            loading_cache.get("5".to_string(), ()).await
-        );
-        assert_eq!(0, metrics.get_hit_count());
-        assert_eq!(6, metrics.get_miss_count());
-        assert_eq!(6, metrics.put_count());
-        assert_eq!(2, metrics.eviction_count());
-
-        assert!(loading_cache.get_if_present("3".to_string()).is_none());
-        assert_eq!(0, metrics.get_hit_count());
-        assert_eq!(7, metrics.get_miss_count());
-        assert_eq!(6, metrics.put_count());
-        assert_eq!(2, metrics.eviction_count());
-
-        assert!(loading_cache.get_if_present("2".to_string()).is_some());
-        assert_eq!(1, metrics.get_hit_count());
-        assert_eq!(7, metrics.get_miss_count());
-        assert_eq!(6, metrics.put_count());
-        assert_eq!(2, metrics.eviction_count());
-
-        loading_cache.invalidate("2".to_string());
-        assert_eq!(1, metrics.get_hit_count());
-        assert_eq!(7, metrics.get_miss_count());
-        assert_eq!(6, metrics.put_count());
-        assert_eq!(3, metrics.eviction_count());
-    }
-
-    #[derive(Debug)]
-    struct TestStringCacheLoader {
-        prefix: String,
-    }
-
-    #[async_trait]
-    impl CacheLoader for TestStringCacheLoader {
-        type K = String;
-        type V = String;
-        type Extra = ();
-
-        async fn load(&self, k: Self::K, _extra: Self::Extra) -> Self::V {
-            format!("{}{k}", self.prefix)
-        }
-    }
-}
diff --git a/ballista/cache/src/metrics/mod.rs 
b/ballista/cache/src/metrics/mod.rs
deleted file mode 100644
index c1b6bcbb..00000000
--- a/ballista/cache/src/metrics/mod.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod loading_cache;
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index daabe8a2..884b3f60 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -48,7 +48,6 @@ s3 = ["object_store/aws"]
 ahash = { version = "0.8", default-features = false }
 arrow-flight = { workspace = true }
 async-trait = "0.1.41"
-ballista-cache = { path = "../cache", version = "0.12.0" }
 bytes = "1.0"
 chrono = { version = "0.4", default-features = false }
 clap = { workspace = true }
diff --git a/ballista/core/src/cache_layer/medium/local_disk.rs 
b/ballista/core/src/cache_layer/medium/local_disk.rs
deleted file mode 100644
index e30d8089..00000000
--- a/ballista/core/src/cache_layer/medium/local_disk.rs
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::medium::CacheMedium;
-use crate::cache_layer::object_store::ObjectStoreWithKey;
-use object_store::local::LocalFileSystem;
-use object_store::path::{Path, DELIMITER};
-use object_store::ObjectStore;
-use std::any::Any;
-use std::fmt::{Display, Formatter};
-use std::sync::Arc;
-
-#[derive(Debug, Clone)]
-pub struct LocalDiskMedium {
-    cache_object_store: Arc<LocalFileSystem>,
-    root_cache_dir: Path,
-}
-
-impl LocalDiskMedium {
-    pub fn new(root_cache_dir: String) -> Self {
-        Self {
-            cache_object_store: Arc::new(LocalFileSystem::new()),
-            root_cache_dir: Path::from(root_cache_dir),
-        }
-    }
-}
-
-impl Display for LocalDiskMedium {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        write!(f, "Cache medium with local disk({})", self.root_cache_dir)
-    }
-}
-
-impl CacheMedium for LocalDiskMedium {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn get_object_store(&self) -> Arc<dyn ObjectStore> {
-        self.cache_object_store.clone()
-    }
-
-    fn get_mapping_location(
-        &self,
-        source_location: &Path,
-        source_object_store: &ObjectStoreWithKey,
-    ) -> Path {
-        let cache_location = format!(
-            "{}{DELIMITER}{}{DELIMITER}{source_location}",
-            self.root_cache_dir,
-            source_object_store.key(),
-        );
-        Path::from(cache_location)
-    }
-}
diff --git a/ballista/core/src/cache_layer/medium/local_memory.rs 
b/ballista/core/src/cache_layer/medium/local_memory.rs
deleted file mode 100644
index 2b89392e..00000000
--- a/ballista/core/src/cache_layer/medium/local_memory.rs
+++ /dev/null
@@ -1,73 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::medium::CacheMedium;
-use crate::cache_layer::object_store::ObjectStoreWithKey;
-use object_store::memory::InMemory;
-use object_store::path::{Path, DELIMITER};
-use object_store::ObjectStore;
-use std::any::Any;
-use std::fmt::{Display, Formatter};
-use std::sync::Arc;
-
-#[derive(Debug, Clone)]
-pub struct LocalMemoryMedium {
-    cache_object_store: Arc<InMemory>,
-}
-
-impl LocalMemoryMedium {
-    pub fn new() -> Self {
-        Self {
-            cache_object_store: Arc::new(InMemory::new()),
-        }
-    }
-}
-
-impl Default for LocalMemoryMedium {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl Display for LocalMemoryMedium {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        write!(f, "Cache medium with local memory")
-    }
-}
-
-impl CacheMedium for LocalMemoryMedium {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn get_object_store(&self) -> Arc<dyn ObjectStore> {
-        self.cache_object_store.clone()
-    }
-
-    fn get_mapping_location(
-        &self,
-        source_location: &Path,
-        source_object_store: &ObjectStoreWithKey,
-    ) -> Path {
-        let cache_location = format!(
-            "{}{DELIMITER}{}{DELIMITER}{source_location}",
-            "memory",
-            source_object_store.key(),
-        );
-        Path::from(cache_location)
-    }
-}
diff --git a/ballista/core/src/cache_layer/medium/mod.rs 
b/ballista/core/src/cache_layer/medium/mod.rs
deleted file mode 100644
index f2082e23..00000000
--- a/ballista/core/src/cache_layer/medium/mod.rs
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::object_store::ObjectStoreWithKey;
-use object_store::path::Path;
-use object_store::ObjectStore;
-use std::any::Any;
-use std::fmt::{Debug, Display};
-use std::sync::Arc;
-
-pub mod local_disk;
-pub mod local_memory;
-
-pub trait CacheMedium: Debug + Send + Sync + Display + 'static {
-    /// Returns the cache layer policy as [`Any`](std::any::Any) so that it 
can be
-    /// downcast to a specific implementation.
-    fn as_any(&self) -> &dyn Any;
-
-    /// Get the ObjectStore for the cache storage
-    fn get_object_store(&self) -> Arc<dyn ObjectStore>;
-
-    /// Get the mapping location on the cache ObjectStore for the source 
location on the source ObjectStore
-    fn get_mapping_location(
-        &self,
-        source_location: &Path,
-        source_object_store: &ObjectStoreWithKey,
-    ) -> Path;
-}
diff --git a/ballista/core/src/cache_layer/mod.rs 
b/ballista/core/src/cache_layer/mod.rs
deleted file mode 100644
index 86e52395..00000000
--- a/ballista/core/src/cache_layer/mod.rs
+++ /dev/null
@@ -1,128 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::medium::local_disk::LocalDiskMedium;
-use crate::cache_layer::medium::local_memory::LocalMemoryMedium;
-use crate::cache_layer::policy::file::FileCacheLayer;
-use std::sync::Arc;
-
-pub mod medium;
-pub mod object_store;
-pub mod policy;
-
-#[derive(Debug, Clone)]
-pub enum CacheLayer {
-    /// The local disk will be used as the cache layer medium
-    /// and the cache level will be the whole file.
-    LocalDiskFile(Arc<FileCacheLayer<LocalDiskMedium>>),
-
-    /// The local memory will be used as the cache layer medium
-    /// and the cache level will be the whole file.
-    LocalMemoryFile(Arc<FileCacheLayer<LocalMemoryMedium>>),
-}
-
-#[cfg(test)]
-mod tests {
-    use ballista_cache::loading_cache::LoadingCache;
-    use futures::TryStreamExt;
-    use object_store::local::LocalFileSystem;
-    use object_store::path::Path;
-    use object_store::{GetResultPayload, ObjectStore};
-    use std::io::Write;
-    use std::sync::Arc;
-    use tempfile::NamedTempFile;
-
-    use crate::cache_layer::medium::local_memory::LocalMemoryMedium;
-    use crate::cache_layer::object_store::file::FileCacheObjectStore;
-    use crate::cache_layer::object_store::ObjectStoreWithKey;
-    use crate::cache_layer::policy::file::FileCacheLayer;
-    use crate::error::{BallistaError, Result};
-
-    #[tokio::test]
-    async fn test_cache_file_to_memory() -> Result<()> {
-        let test_data = "test_cache_file_to_memory";
-        let test_bytes = test_data.as_bytes();
-
-        let mut test_file = NamedTempFile::new()?;
-        let source_location = Path::from(test_file.as_ref().to_str().unwrap());
-
-        let test_size = test_file.write(test_bytes)?;
-        assert_eq!(test_bytes.len(), test_size);
-
-        // Check the testing data on the source object store
-        let source_object_store = Arc::new(LocalFileSystem::new());
-        let source_key = "file";
-        let source_object_store_with_key = Arc::new(ObjectStoreWithKey::new(
-            source_key.to_string(),
-            source_object_store.clone(),
-        ));
-        let actual_source = 
source_object_store.get(&source_location).await.unwrap();
-        match actual_source.payload {
-            GetResultPayload::File(file, _) => {
-                assert_eq!(test_bytes.len(), file.metadata()?.len() as usize);
-            }
-            _ => {
-                return Err(BallistaError::General(
-                    "File instead of data stream should be 
returned".to_string(),
-                ))
-            }
-        }
-
-        // Check the testing data on the cache object store
-        let cache_medium = LocalMemoryMedium::new();
-        let cache_layer = FileCacheLayer::new(1000, 1, cache_medium);
-        let cache_meta = cache_layer
-            .cache()
-            .get(
-                source_location.clone(),
-                source_object_store_with_key.clone(),
-            )
-            .await;
-        assert_eq!(test_bytes.len(), cache_meta.size);
-
-        let cache_object_store = FileCacheObjectStore::new(
-            Arc::new(cache_layer),
-            source_object_store_with_key.clone(),
-        );
-        let actual_cache = 
cache_object_store.get(&source_location).await.unwrap();
-        match actual_cache.payload {
-            GetResultPayload::File(_, _) => {
-                return Err(BallistaError::General(
-                    "Data stream instead of file should be 
returned".to_string(),
-                ))
-            }
-            GetResultPayload::Stream(s) => {
-                let mut buf: Vec<u8> = vec![];
-                s.try_fold(&mut buf, |acc, part| async move {
-                    let mut part: Vec<u8> = part.into();
-                    acc.append(&mut part);
-                    Ok(acc)
-                })
-                .await
-                .unwrap();
-                let actual_cache_data = String::from_utf8(buf).unwrap();
-                assert_eq!(test_data, actual_cache_data);
-            }
-        }
-
-        test_file.close()?;
-
-        std::mem::forget(cache_object_store);
-
-        Ok(())
-    }
-}
diff --git a/ballista/core/src/cache_layer/object_store/file.rs 
b/ballista/core/src/cache_layer/object_store/file.rs
deleted file mode 100644
index 8229af1c..00000000
--- a/ballista/core/src/cache_layer/object_store/file.rs
+++ /dev/null
@@ -1,267 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::medium::CacheMedium;
-use crate::cache_layer::object_store::ObjectStoreWithKey;
-use crate::cache_layer::policy::file::FileCacheLayer;
-use crate::error::BallistaError;
-use async_trait::async_trait;
-use ballista_cache::loading_cache::LoadingCache;
-use bytes::Bytes;
-use futures::stream::{self, BoxStream, StreamExt};
-use log::info;
-use object_store::path::Path;
-use object_store::{
-    Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, 
ObjectStore,
-    PutMultipartOpts, PutOptions, PutPayload, PutResult,
-};
-use std::fmt::{Debug, Display, Formatter};
-use std::ops::Range;
-use std::sync::Arc;
-
-#[derive(Debug)]
-pub struct FileCacheObjectStore<M>
-where
-    M: CacheMedium,
-{
-    cache_layer: Arc<FileCacheLayer<M>>,
-    inner: Arc<ObjectStoreWithKey>,
-}
-
-impl<M> FileCacheObjectStore<M>
-where
-    M: CacheMedium,
-{
-    pub fn new(
-        cache_layer: Arc<FileCacheLayer<M>>,
-        inner: Arc<ObjectStoreWithKey>,
-    ) -> Self {
-        Self { cache_layer, inner }
-    }
-}
-
-impl<M> Display for FileCacheObjectStore<M>
-where
-    M: CacheMedium,
-{
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        write!(
-            f,
-            "Object store {} with file level cache on {}",
-            self.inner,
-            self.cache_layer.cache_store()
-        )
-    }
-}
-
-#[async_trait]
-impl<M> ObjectStore for FileCacheObjectStore<M>
-where
-    M: CacheMedium,
-{
-    async fn put(
-        &self,
-        _location: &Path,
-        _bytes: PutPayload,
-    ) -> object_store::Result<PutResult> {
-        Err(Error::NotSupported {
-            source: Box::new(BallistaError::General(
-                "Write path is not supported".to_string(),
-            )),
-        })
-    }
-
-    async fn put_opts(
-        &self,
-        _location: &Path,
-        _bytes: PutPayload,
-        _opts: PutOptions,
-    ) -> object_store::Result<PutResult> {
-        Err(Error::NotSupported {
-            source: Box::new(BallistaError::General(
-                "Write path is not supported".to_string(),
-            )),
-        })
-    }
-
-    async fn put_multipart(
-        &self,
-        _location: &Path,
-    ) -> object_store::Result<Box<dyn MultipartUpload>> {
-        Err(Error::NotSupported {
-            source: Box::new(BallistaError::General(
-                "Write path is not supported".to_string(),
-            )),
-        })
-    }
-
-    async fn put_multipart_opts(
-        &self,
-        _location: &Path,
-        _opts: PutMultipartOpts,
-    ) -> object_store::Result<Box<dyn MultipartUpload>> {
-        Err(Error::NotSupported {
-            source: Box::new(BallistaError::General(
-                "Write path is not supported".to_string(),
-            )),
-        })
-    }
-
-    /// If it already exists in cache, use the cached result.
-    /// Otherwise, trigger a task to load the data into cache; At the 
meanwhile,
-    /// get the result from the data source
-    async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
-        if let Some(cache_object_mata) =
-            self.cache_layer.cache().get_if_present(location.clone())
-        {
-            info!("Data for {} is cached", location);
-            let cache_location = &cache_object_mata.location;
-            self.cache_layer.cache_store().get(cache_location).await
-        } else {
-            let io_runtime = self.cache_layer.io_runtime();
-            let cache_layer = self.cache_layer.clone();
-            let key = location.clone();
-            let extra = self.inner.clone();
-            io_runtime.spawn(async move {
-                info!("Going to cache data for {}", key);
-                cache_layer.cache().get(key.clone(), extra).await;
-                info!("Data for {} has been cached", key);
-            });
-            self.inner.get(location).await
-        }
-    }
-
-    async fn get_opts(
-        &self,
-        location: &Path,
-        options: GetOptions,
-    ) -> object_store::Result<GetResult> {
-        if let Some(cache_object_mata) =
-            self.cache_layer.cache().get_if_present(location.clone())
-        {
-            info!("Data for {} is cached", location);
-            let cache_location = &cache_object_mata.location;
-            self.cache_layer
-                .cache_store()
-                .get_opts(cache_location, options)
-                .await
-        } else {
-            let io_runtime = self.cache_layer.io_runtime();
-            let cache_layer = self.cache_layer.clone();
-            let key = location.clone();
-            let extra = self.inner.clone();
-            io_runtime.spawn(async move {
-                info!("Going to cache data for {}", key);
-                cache_layer.cache().get(key.clone(), extra).await;
-                info!("Data for {} has been cached", key);
-            });
-            self.inner.get_opts(location, options).await
-        }
-    }
-
-    /// If it already exists in cache, use the cached result.
-    /// Otherwise, trigger a task to load the data into cache; At the 
meanwhile,
-    /// get the result from the data source
-    async fn get_range(
-        &self,
-        location: &Path,
-        range: Range<usize>,
-    ) -> object_store::Result<Bytes> {
-        if let Some(cache_object_mata) =
-            self.cache_layer.cache().get_if_present(location.clone())
-        {
-            info!("Data for {} is cached", location);
-            let cache_location = &cache_object_mata.location;
-            self.cache_layer
-                .cache_store()
-                .get_range(cache_location, range)
-                .await
-        } else {
-            let io_runtime = self.cache_layer.io_runtime();
-            let cache_layer = self.cache_layer.clone();
-            let key = location.clone();
-            let extra = self.inner.clone();
-            io_runtime.spawn(async move {
-                info!("Going to cache data for {}", key);
-                cache_layer.cache().get(key.clone(), extra).await;
-                info!("Data for {} has been cached", key);
-            });
-            self.inner.get_range(location, range).await
-        }
-    }
-
-    /// If it already exists in cache, use the cached result.
-    /// Otherwise, get the result from the data source.
-    /// It will not trigger the task to load data into cache.
-    async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
-        if let Some(cache_object_mata) =
-            self.cache_layer.cache().get_if_present(location.clone())
-        {
-            let cache_location = &cache_object_mata.location;
-            self.cache_layer.cache_store().head(cache_location).await
-        } else {
-            self.inner.head(location).await
-        }
-    }
-
-    async fn delete(&self, _location: &Path) -> object_store::Result<()> {
-        Err(Error::NotSupported {
-            source: Box::new(BallistaError::General(
-                "Delete is not supported".to_string(),
-            )),
-        })
-    }
-
-    fn list(
-        &self,
-        _prefix: Option<&Path>,
-    ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
-        stream::once(async {
-            Err(Error::NotSupported {
-                source: Box::new(BallistaError::General(
-                    "List is not supported".to_string(),
-                )),
-            })
-        })
-        .boxed()
-    }
-
-    async fn list_with_delimiter(
-        &self,
-        _prefix: Option<&Path>,
-    ) -> object_store::Result<ListResult> {
-        Err(Error::NotSupported {
-            source: Box::new(BallistaError::General("List is not 
supported".to_string())),
-        })
-    }
-
-    async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> 
{
-        Err(Error::NotSupported {
-            source: Box::new(BallistaError::General("Copy is not 
supported".to_string())),
-        })
-    }
-
-    async fn copy_if_not_exists(
-        &self,
-        _from: &Path,
-        _to: &Path,
-    ) -> object_store::Result<()> {
-        Err(Error::NotSupported {
-            source: Box::new(BallistaError::General("Copy is not 
supported".to_string())),
-        })
-    }
-}
diff --git a/ballista/core/src/cache_layer/object_store/mod.rs 
b/ballista/core/src/cache_layer/object_store/mod.rs
deleted file mode 100644
index 71a3464e..00000000
--- a/ballista/core/src/cache_layer/object_store/mod.rs
+++ /dev/null
@@ -1,168 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod file;
-
-use async_trait::async_trait;
-use bytes::Bytes;
-use futures::stream::BoxStream;
-use object_store::path::Path;
-use object_store::{
-    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, 
ObjectStore,
-    PutMultipartOpts, PutOptions, PutPayload, PutResult,
-};
-use std::fmt::{Debug, Display, Formatter};
-use std::ops::Range;
-use std::sync::Arc;
-
-/// An [`ObjectStore`] wrapper with a specific key which is used for 
registration in [`ObjectStoreRegistry`].
-///
-/// The [`key`] can be used for the cache path prefix.
-#[derive(Debug)]
-pub struct ObjectStoreWithKey {
-    key: String,
-    inner: Arc<dyn ObjectStore>,
-}
-
-impl ObjectStoreWithKey {
-    pub fn new(key: String, inner: Arc<dyn ObjectStore>) -> Self {
-        Self { key, inner }
-    }
-
-    pub fn key(&self) -> &str {
-        &self.key
-    }
-}
-
-impl Display for ObjectStoreWithKey {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        write!(
-            f,
-            "Registered object store {} with key {}",
-            self.inner, self.key,
-        )
-    }
-}
-
-#[async_trait]
-impl ObjectStore for ObjectStoreWithKey {
-    async fn put(
-        &self,
-        location: &Path,
-        bytes: PutPayload,
-    ) -> object_store::Result<PutResult> {
-        self.inner.put(location, bytes).await
-    }
-
-    async fn put_opts(
-        &self,
-        location: &Path,
-        bytes: PutPayload,
-        opts: PutOptions,
-    ) -> object_store::Result<PutResult> {
-        self.inner.put_opts(location, bytes, opts).await
-    }
-
-    async fn put_multipart(
-        &self,
-        location: &Path,
-    ) -> object_store::Result<Box<dyn MultipartUpload>> {
-        self.inner.put_multipart(location).await
-    }
-
-    async fn put_multipart_opts(
-        &self,
-        location: &Path,
-        opts: PutMultipartOpts,
-    ) -> object_store::Result<Box<dyn MultipartUpload>> {
-        self.inner.put_multipart_opts(location, opts).await
-    }
-
-    async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
-        self.inner.get(location).await
-    }
-
-    async fn get_opts(
-        &self,
-        location: &Path,
-        options: GetOptions,
-    ) -> object_store::Result<GetResult> {
-        self.inner.get_opts(location, options).await
-    }
-
-    async fn get_range(
-        &self,
-        location: &Path,
-        range: Range<usize>,
-    ) -> object_store::Result<Bytes> {
-        self.inner.get_range(location, range).await
-    }
-
-    async fn get_ranges(
-        &self,
-        location: &Path,
-        ranges: &[Range<usize>],
-    ) -> object_store::Result<Vec<Bytes>> {
-        self.inner.get_ranges(location, ranges).await
-    }
-
-    async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
-        self.inner.head(location).await
-    }
-
-    async fn delete(&self, location: &Path) -> object_store::Result<()> {
-        self.inner.delete(location).await
-    }
-
-    fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
-        self.inner.list(prefix)
-    }
-
-    async fn list_with_delimiter(
-        &self,
-        prefix: Option<&Path>,
-    ) -> object_store::Result<ListResult> {
-        self.inner.list_with_delimiter(prefix).await
-    }
-
-    async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
-        self.inner.copy(from, to).await
-    }
-
-    async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> 
{
-        self.inner.rename(from, to).await
-    }
-
-    async fn copy_if_not_exists(
-        &self,
-        from: &Path,
-        to: &Path,
-    ) -> object_store::Result<()> {
-        self.inner.copy_if_not_exists(from, to).await
-    }
-
-    async fn rename_if_not_exists(
-        &self,
-        from: &Path,
-        to: &Path,
-    ) -> object_store::Result<()> {
-        self.inner.rename_if_not_exists(from, to).await
-    }
-}
diff --git a/ballista/core/src/cache_layer/policy/file.rs 
b/ballista/core/src/cache_layer/policy/file.rs
deleted file mode 100644
index af4865d9..00000000
--- a/ballista/core/src/cache_layer/policy/file.rs
+++ /dev/null
@@ -1,302 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::medium::CacheMedium;
-use crate::cache_layer::object_store::ObjectStoreWithKey;
-use crate::error::{BallistaError, Result};
-use async_trait::async_trait;
-use ballista_cache::backend::policy::lru::lru_cache::LruCache;
-use ballista_cache::backend::policy::lru::ResourceCounter;
-use ballista_cache::listener::cache_policy::{
-    CachePolicyListener, CachePolicyWithListener,
-};
-use ballista_cache::loading_cache::loader::CacheLoader;
-use ballista_cache::{
-    create_loading_cache_with_metrics, DefaultLoadingCache, 
LoadingCacheMetrics,
-};
-use log::{error, info, warn};
-use object_store::path::Path;
-use object_store::{ObjectMeta, ObjectStore, PutPayload};
-use std::ops::Range;
-use std::sync::Arc;
-use tokio::runtime::Runtime;
-
-type DefaultFileLoadingCache<M> =
-    DefaultLoadingCache<Path, ObjectMeta, FileCacheLoader<M>>;
-type FileCacheMetrics = LoadingCacheMetrics<Path, ObjectMeta>;
-
-#[derive(Debug)]
-pub struct FileCacheLayer<M>
-where
-    M: CacheMedium,
-{
-    cache_store: Arc<dyn ObjectStore>,
-    loading_cache: DefaultFileLoadingCache<M>,
-    io_runtime: Runtime,
-    metrics: Arc<FileCacheMetrics>,
-}
-
-impl<M> FileCacheLayer<M>
-where
-    M: CacheMedium,
-{
-    pub fn new(capacity: usize, cache_io_concurrency: u32, cache_medium: M) -> 
Self {
-        let cache_store = cache_medium.get_object_store();
-
-        let cache_counter = FileCacheCounter::new(capacity);
-        let lru_cache = LruCache::with_resource_counter(cache_counter);
-        let file_cache_loader = Arc::new(FileCacheLoader::new(cache_medium));
-        let cache_with_removal_listener =
-            CachePolicyWithListener::new(lru_cache, 
vec![file_cache_loader.clone()]);
-        let (loading_cache, metrics) = create_loading_cache_with_metrics(
-            cache_with_removal_listener,
-            file_cache_loader,
-        );
-        let io_runtime = tokio::runtime::Builder::new_multi_thread()
-            .enable_all()
-            .thread_name("loading_cache")
-            .worker_threads(cache_io_concurrency as usize)
-            .build()
-            .expect("Creating tokio runtime");
-
-        Self {
-            cache_store,
-            loading_cache,
-            io_runtime,
-            metrics,
-        }
-    }
-
-    pub fn cache_store(&self) -> Arc<dyn ObjectStore> {
-        self.cache_store.clone()
-    }
-
-    pub fn cache(&self) -> &DefaultFileLoadingCache<M> {
-        &self.loading_cache
-    }
-
-    pub fn io_runtime(&self) -> &Runtime {
-        &self.io_runtime
-    }
-
-    pub fn metrics(&self) -> &FileCacheMetrics {
-        self.metrics.as_ref()
-    }
-}
-
-#[derive(Debug)]
-pub struct FileCacheLoader<M>
-where
-    M: CacheMedium,
-{
-    cache_medium: Arc<M>,
-}
-
-impl<M> FileCacheLoader<M>
-where
-    M: CacheMedium,
-{
-    fn new(cache_medium: M) -> Self {
-        Self {
-            cache_medium: Arc::new(cache_medium),
-        }
-    }
-
-    fn remove_object(&self, source_path: Path, object_meta: ObjectMeta) {
-        let cache_store = self.cache_medium.get_object_store();
-        let location = object_meta.location;
-        tokio::runtime::Handle::try_current().unwrap().block_on( async {
-            if let Err(e) = cache_store.delete(&location).await {
-                error!("Fail to delete file {location} on the cache 
ObjectStore for source {source_path} due to {e}");
-            }
-        });
-    }
-}
-
-/// Will return the location of the cached file on the cache object store.
-///
-/// The last_modified of the ObjectMeta will be from the source file, which 
will be useful
-/// for checking whether the source file changed or not.
-///
-/// The size will be the one of cached file rather than the one of the source 
file in case of changing the data format
-async fn load_object<M>(
-    cache_medium: Arc<M>,
-    source_location: Path,
-    source_store: &ObjectStoreWithKey,
-) -> Result<ObjectMeta>
-where
-    M: CacheMedium,
-{
-    let source_meta = source_store.head(&source_location).await.map_err(|e| {
-        BallistaError::General(format!(
-            "Fail to read head info for {source_location} due to {e}"
-        ))
-    })?;
-
-    let cache_store = cache_medium.get_object_store();
-    let cache_location =
-        cache_medium.get_mapping_location(&source_location, source_store);
-
-    // Check whether the cache location exist or not. If exists, delete it 
first.
-    if cache_store.head(&cache_location).await.is_ok() {
-        if let Err(e) = cache_store.delete(&cache_location).await {
-            error!(
-                "Fail to delete file {cache_location} on the cache ObjectStore 
due to {e}"
-            );
-        }
-    }
-
-    info!(
-        "Going to cache object from {} to {}",
-        source_location, cache_location
-    );
-    let range = Range {
-        start: 0,
-        end: source_meta.size,
-    };
-    let data = source_store
-        .get_range(&source_location, range)
-        .await
-        .map_err(|e| {
-            BallistaError::General(format!(
-                "Fail to get file data from {source_location} due to {e}"
-            ))
-        })?;
-    info!(
-        "{} bytes will be cached for {}",
-        data.len(),
-        source_location
-    );
-    cache_store
-        .put(&cache_location, PutPayload::from_bytes(data))
-        .await
-        .map_err(|e| {
-            BallistaError::General(format!(
-                "Fail to write out data to {cache_location} due to {e}"
-            ))
-        })?;
-    info!(
-        "Object {} has already been cached to {}",
-        source_location, cache_location
-    );
-
-    let cache_meta = cache_store.head(&cache_location).await.map_err(|e| {
-        BallistaError::General(format!(
-            "Fail to read head info for {cache_location} due to {e}"
-        ))
-    })?;
-
-    Ok(ObjectMeta {
-        location: cache_location,
-        last_modified: source_meta.last_modified,
-        size: cache_meta.size,
-        e_tag: source_meta.e_tag,
-        version: None,
-    })
-}
-
-#[async_trait]
-impl<M> CacheLoader for FileCacheLoader<M>
-where
-    M: CacheMedium,
-{
-    type K = Path;
-    type V = ObjectMeta;
-    type Extra = Arc<ObjectStoreWithKey>;
-
-    async fn load(&self, source_location: Self::K, source_store: Self::Extra) 
-> Self::V {
-        match load_object(self.cache_medium.clone(), source_location, 
&source_store).await
-        {
-            Ok(object_meta) => object_meta,
-            Err(e) => panic!("{}", e),
-        }
-    }
-}
-
-impl<M> CachePolicyListener for FileCacheLoader<M>
-where
-    M: CacheMedium,
-{
-    type K = Path;
-    type V = ObjectMeta;
-
-    fn listen_on_get(&self, _k: Self::K, _v: Option<Self::V>) {
-        // Do nothing
-    }
-
-    fn listen_on_peek(&self, _k: Self::K, _v: Option<Self::V>) {
-        // Do nothing
-    }
-
-    fn listen_on_put(&self, _k: Self::K, _v: Self::V, _old_v: Option<Self::V>) 
{
-        // Do nothing
-    }
-
-    fn listen_on_remove(&self, k: Self::K, v: Option<Self::V>) {
-        if let Some(v) = v {
-            self.remove_object(k, v);
-        } else {
-            warn!("The entry does not exist for key {k}");
-        }
-    }
-
-    fn listen_on_pop(&self, entry: (Self::K, Self::V)) {
-        self.remove_object(entry.0, entry.1);
-    }
-}
-
-#[derive(Debug, Clone, Copy)]
-pub struct FileCacheCounter {
-    /// The maximum data size to be cached
-    capacity: usize,
-    /// The data size already be cached
-    cached_size: usize,
-}
-
-impl FileCacheCounter {
-    pub fn new(capacity: usize) -> Self {
-        FileCacheCounter {
-            capacity,
-            cached_size: 0,
-        }
-    }
-
-    pub fn capacity(&self) -> usize {
-        self.capacity
-    }
-
-    pub fn cached_size(&self) -> usize {
-        self.cached_size
-    }
-}
-
-impl ResourceCounter for FileCacheCounter {
-    type K = Path;
-    type V = ObjectMeta;
-
-    fn consume(&mut self, _k: &Self::K, v: &Self::V) {
-        self.cached_size += v.size;
-    }
-
-    fn restore(&mut self, _k: &Self::K, v: &Self::V) {
-        self.cached_size -= v.size;
-    }
-
-    fn exceed_capacity(&self) -> bool {
-        self.cached_size > self.capacity
-    }
-}
diff --git a/ballista/core/src/cache_layer/policy/mod.rs 
b/ballista/core/src/cache_layer/policy/mod.rs
deleted file mode 100644
index 0cb4c86b..00000000
--- a/ballista/core/src/cache_layer/policy/mod.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod file;
diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index 46424ecf..746a0be9 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -37,8 +37,6 @@ pub const BALLISTA_REPARTITION_AGGREGATIONS: &str = 
"ballista.repartition.aggreg
 pub const BALLISTA_REPARTITION_WINDOWS: &str = "ballista.repartition.windows";
 pub const BALLISTA_PARQUET_PRUNING: &str = "ballista.parquet.pruning";
 pub const BALLISTA_COLLECT_STATISTICS: &str = "ballista.collect_statistics";
-/// Indicate whether to enable to data cache for a task
-pub const BALLISTA_DATA_CACHE_ENABLED: &str = "ballista.data_cache.enabled";
 
 pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str = 
"ballista.with_information_schema";
 /// give a plugin files dir, and then the dynamic library files in this dir 
will be load when scheduler state init.
diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs
index 5306e8b9..c52d2ef4 100644
--- a/ballista/core/src/lib.rs
+++ b/ballista/core/src/lib.rs
@@ -22,8 +22,6 @@ pub fn print_version() {
     println!("Ballista version: {BALLISTA_VERSION}")
 }
 
-#[cfg(not(windows))]
-pub mod cache_layer;
 pub mod client;
 pub mod config;
 pub mod consistent_hash;
diff --git a/ballista/core/src/object_store_registry/cache.rs 
b/ballista/core/src/object_store_registry/cache.rs
deleted file mode 100644
index 621e30a1..00000000
--- a/ballista/core/src/object_store_registry/cache.rs
+++ /dev/null
@@ -1,86 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::object_store::file::FileCacheObjectStore;
-use crate::cache_layer::object_store::ObjectStoreWithKey;
-use crate::cache_layer::CacheLayer;
-use crate::object_store_registry::BallistaObjectStoreRegistry;
-use datafusion::datasource::object_store::ObjectStoreRegistry;
-use datafusion::execution::runtime_env::RuntimeConfig;
-use object_store::ObjectStore;
-use std::sync::Arc;
-use url::Url;
-
-/// Get a RuntimeConfig with CachedBasedObjectStoreRegistry
-pub fn with_cache_layer(config: RuntimeConfig, cache_layer: CacheLayer) -> 
RuntimeConfig {
-    let registry = Arc::new(BallistaObjectStoreRegistry::default());
-    let registry = Arc::new(CachedBasedObjectStoreRegistry::new(registry, 
cache_layer));
-    config.with_object_store_registry(registry)
-}
-
-/// An object store registry wrapped an existing one with a cache layer.
-///
-/// During [`get_store`], after getting the source [`ObjectStore`], based on 
the url,
-/// it will firstly be wrapped with a key which will be used as the cache 
prefix path.
-/// And then it will be wrapped with the [`cache_layer`].
-#[derive(Debug)]
-pub struct CachedBasedObjectStoreRegistry {
-    inner: Arc<dyn ObjectStoreRegistry>,
-    cache_layer: CacheLayer,
-}
-
-impl CachedBasedObjectStoreRegistry {
-    pub fn new(inner: Arc<dyn ObjectStoreRegistry>, cache_layer: CacheLayer) 
-> Self {
-        Self { inner, cache_layer }
-    }
-}
-
-impl ObjectStoreRegistry for CachedBasedObjectStoreRegistry {
-    fn register_store(
-        &self,
-        url: &Url,
-        store: Arc<dyn ObjectStore>,
-    ) -> Option<Arc<dyn ObjectStore>> {
-        self.inner.register_store(url, store)
-    }
-
-    fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn 
ObjectStore>> {
-        let source_object_store = self.inner.get_store(url)?;
-        let object_store_with_key = Arc::new(ObjectStoreWithKey::new(
-            get_url_key(url),
-            source_object_store,
-        ));
-        Ok(match &self.cache_layer {
-            CacheLayer::LocalDiskFile(cache_layer) => Arc::new(
-                FileCacheObjectStore::new(cache_layer.clone(), 
object_store_with_key),
-            ),
-            CacheLayer::LocalMemoryFile(cache_layer) => Arc::new(
-                FileCacheObjectStore::new(cache_layer.clone(), 
object_store_with_key),
-            ),
-        })
-    }
-}
-
-/// Get the key of a url for object store cache prefix path.
-/// The credential info will be removed.
-fn get_url_key(url: &Url) -> String {
-    format!(
-        "{}://{}",
-        url.scheme(),
-        &url[url::Position::BeforeHost..url::Position::AfterPort],
-    )
-}
diff --git a/ballista/core/src/object_store_registry/mod.rs 
b/ballista/core/src/object_store_registry/mod.rs
index be527e71..2e394a7c 100644
--- a/ballista/core/src/object_store_registry/mod.rs
+++ b/ballista/core/src/object_store_registry/mod.rs
@@ -15,9 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#[cfg(not(windows))]
-pub mod cache;
-
 use datafusion::common::DataFusionError;
 use datafusion::datasource::object_store::{
     DefaultObjectStoreRegistry, ObjectStoreRegistry,
diff --git a/ballista/executor/src/execution_loop.rs 
b/ballista/executor/src/execution_loop.rs
index 0c169e2d..591c5c45 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -208,7 +208,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
     for window_func in executor.window_functions.clone() {
         task_window_functions.insert(window_func.0, window_func.1);
     }
-    let runtime = executor.get_runtime(false);
+    let runtime = executor.get_runtime();
     let session_id = task.session_id.clone();
     let task_context = Arc::new(TaskContext::new(
         Some(task_identity.clone()),
diff --git a/ballista/executor/src/executor.rs 
b/ballista/executor/src/executor.rs
index 4e83b125..8ae8e6aa 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -75,11 +75,6 @@ pub struct Executor {
     /// Runtime environment for Executor
     runtime: Arc<RuntimeEnv>,
 
-    /// Runtime environment for Executor with data cache.
-    /// The difference with [`runtime`] is that it leverages a different 
[`object_store_registry`].
-    /// And others things are shared with [`runtime`].
-    runtime_with_data_cache: Option<Arc<RuntimeEnv>>,
-
     /// Collector for runtime execution metrics
     pub metrics_collector: Arc<dyn ExecutorMetricsCollector>,
 
@@ -100,7 +95,6 @@ impl Executor {
         metadata: ExecutorRegistration,
         work_dir: &str,
         runtime: Arc<RuntimeEnv>,
-        runtime_with_data_cache: Option<Arc<RuntimeEnv>>,
         metrics_collector: Arc<dyn ExecutorMetricsCollector>,
         concurrent_tasks: usize,
         execution_engine: Option<Arc<dyn ExecutionEngine>>,
@@ -123,7 +117,6 @@ impl Executor {
             // TODO: set to default window functions when they are moved to 
udwf
             window_functions: HashMap::new(),
             runtime,
-            runtime_with_data_cache,
             metrics_collector,
             concurrent_tasks,
             abort_handles: Default::default(),
@@ -134,16 +127,8 @@ impl Executor {
 }
 
 impl Executor {
-    pub fn get_runtime(&self, data_cache: bool) -> Arc<RuntimeEnv> {
-        if data_cache {
-            if let Some(runtime) = self.runtime_with_data_cache.clone() {
-                runtime
-            } else {
-                self.runtime.clone()
-            }
-        } else {
-            self.runtime.clone()
-        }
+    pub fn get_runtime(&self) -> Arc<RuntimeEnv> {
+        self.runtime.clone()
     }
 
     /// Execute one partition of a query stage and persist the result to disk 
in IPC format. On
@@ -363,7 +348,6 @@ mod test {
             executor_registration,
             &work_dir,
             ctx.runtime_env(),
-            None,
             Arc::new(LoggingMetricsCollector {}),
             2,
             None,
diff --git a/ballista/executor/src/executor_process.rs 
b/ballista/executor/src/executor_process.rs
index f438e2f6..c19f0656 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -40,14 +40,8 @@ use uuid::Uuid;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
 
-#[cfg(not(windows))]
-use ballista_core::cache_layer::{
-    medium::local_disk::LocalDiskMedium, policy::file::FileCacheLayer, 
CacheLayer,
-};
 use ballista_core::config::{DataCachePolicy, LogRotationPolicy, 
TaskSchedulingPolicy};
 use ballista_core::error::BallistaError;
-#[cfg(not(windows))]
-use 
ballista_core::object_store_registry::cache::CachedBasedObjectStoreRegistry;
 use ballista_core::object_store_registry::with_object_store_registry;
 use ballista_core::serde::protobuf::executor_resource::Resource;
 use ballista_core::serde::protobuf::executor_status::Status;
@@ -195,50 +189,12 @@ pub async fn start_executor_process(opt: 
Arc<ExecutorProcessConfig>) -> Result<(
         })?)
     };
 
-    // Set the object store registry
-    #[cfg(not(windows))]
-    let runtime_with_data_cache = {
-        let cache_dir = opt.cache_dir.clone();
-        let cache_capacity = opt.cache_capacity;
-        let cache_io_concurrency = opt.cache_io_concurrency;
-        let cache_layer =
-            opt.data_cache_policy
-                .map(|data_cache_policy| match data_cache_policy {
-                    DataCachePolicy::LocalDiskFile => {
-                        let cache_dir = cache_dir.unwrap();
-                        let cache_layer = FileCacheLayer::new(
-                            cache_capacity as usize,
-                            cache_io_concurrency,
-                            LocalDiskMedium::new(cache_dir),
-                        );
-                        CacheLayer::LocalDiskFile(Arc::new(cache_layer))
-                    }
-                });
-        if let Some(cache_layer) = cache_layer {
-            let registry = Arc::new(CachedBasedObjectStoreRegistry::new(
-                runtime.object_store_registry.clone(),
-                cache_layer,
-            ));
-            Some(Arc::new(RuntimeEnv {
-                memory_pool: runtime.memory_pool.clone(),
-                disk_manager: runtime.disk_manager.clone(),
-                cache_manager: runtime.cache_manager.clone(),
-                object_store_registry: registry,
-            }))
-        } else {
-            None
-        }
-    };
-    #[cfg(windows)]
-    let runtime_with_data_cache = { None };
-
     let metrics_collector = Arc::new(LoggingMetricsCollector::default());
 
     let executor = Arc::new(Executor::new(
         executor_meta,
         &work_dir,
         runtime,
-        runtime_with_data_cache,
         metrics_collector,
         concurrent_tasks,
         opt.execution_engine.clone(),
diff --git a/ballista/executor/src/executor_server.rs 
b/ballista/executor/src/executor_server.rs
index 5754334c..6e3d5589 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -28,7 +28,6 @@ use log::{debug, error, info, warn};
 use tonic::transport::Channel;
 use tonic::{Request, Response, Status};
 
-use ballista_core::config::BALLISTA_DATA_CACHE_ENABLED;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::protobuf::{
     executor_grpc_server::{ExecutorGrpc, ExecutorGrpcServer},
@@ -344,10 +343,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorServer<T,
 
         let task_context = {
             let task_props = task.props;
-            let data_cache = task_props
-                .get(BALLISTA_DATA_CACHE_ENABLED)
-                .map(|data_cache| data_cache.parse().unwrap_or(false))
-                .unwrap_or(false);
             let mut config = ConfigOptions::new();
             for (k, v) in task_props.iter() {
                 if let Err(e) = config.set(k, v) {
@@ -357,10 +352,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorServer<T,
             let session_config = SessionConfig::from(config);
 
             let function_registry = task.function_registry;
-            if data_cache {
-                info!("Data cache will be enabled for {}", task_identity);
-            }
-            let runtime = self.executor.get_runtime(data_cache);
+            let runtime = self.executor.get_runtime();
 
             Arc::new(TaskContext::new(
                 Some(task_identity.clone()),
@@ -649,7 +641,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorGrpc
                     scheduler_id: scheduler_id.clone(),
                     task: get_task_definition(
                         task,
-                        self.executor.get_runtime(false),
+                        self.executor.get_runtime(),
                         self.executor.scalar_functions.clone(),
                         self.executor.aggregate_functions.clone(),
                         self.executor.window_functions.clone(),
@@ -677,7 +669,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorGrpc
         for multi_task in multi_tasks {
             let multi_task: Vec<TaskDefinition> = get_task_definition_vec(
                 multi_task,
-                self.executor.get_runtime(false),
+                self.executor.get_runtime(),
                 self.executor.scalar_functions.clone(),
                 self.executor.aggregate_functions.clone(),
                 self.executor.window_functions.clone(),
diff --git a/ballista/executor/src/standalone.rs 
b/ballista/executor/src/standalone.rs
index 971af517..38e27713 100644
--- a/ballista/executor/src/standalone.rs
+++ b/ballista/executor/src/standalone.rs
@@ -82,7 +82,6 @@ pub async fn new_standalone_executor<
         executor_meta,
         &work_dir,
         Arc::new(RuntimeEnv::new(config).unwrap()),
-        None,
         Arc::new(LoggingMetricsCollector::default()),
         concurrent_tasks,
         None,
diff --git a/ballista/scheduler/src/cluster/mod.rs 
b/ballista/scheduler/src/cluster/mod.rs
index 5b9ecedf..81432056 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -371,7 +371,6 @@ pub(crate) async fn bind_task_bias(
                     stage_attempt_num: running_stage.stage_attempt_num,
                     task_id,
                     task_attempt: 
running_stage.task_failure_numbers[partition_id],
-                    data_cache: false,
                     plan: running_stage.plan.clone(),
                 };
                 schedulable_tasks.push((executor_id, task_desc));
@@ -460,7 +459,6 @@ pub(crate) async fn bind_task_round_robin(
                     stage_attempt_num: running_stage.stage_attempt_num,
                     task_id,
                     task_attempt: 
running_stage.task_failure_numbers[partition_id],
-                    data_cache: false,
                     plan: running_stage.plan.clone(),
                 };
                 schedulable_tasks.push((executor_id, task_desc));
@@ -565,7 +563,6 @@ pub(crate) async fn bind_task_consistent_hash(
                             stage_id: running_stage.stage_id,
                             partition_id,
                         };
-                        let data_cache = tolerance == 0;
                         let task_desc = TaskDescription {
                             session_id: session_id.clone(),
                             partition,
@@ -573,7 +570,6 @@ pub(crate) async fn bind_task_consistent_hash(
                             task_id,
                             task_attempt: running_stage.task_failure_numbers
                                 [partition_id],
-                            data_cache,
                             plan: running_stage.plan.clone(),
                         };
                         schedulable_tasks.push((executor_id, task_desc));
diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index e72e3906..9e50742f 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -906,7 +906,6 @@ impl ExecutionGraph {
                     stage_attempt_num: stage.stage_attempt_num,
                     task_id,
                     task_attempt,
-                    data_cache: false,
                     plan: stage.plan.clone(),
                 })
             } else {
@@ -1456,7 +1455,6 @@ pub struct TaskDescription {
     pub stage_attempt_num: usize,
     pub task_id: usize,
     pub task_attempt: usize,
-    pub data_cache: bool,
     pub plan: Arc<dyn ExecutionPlan>,
 }
 
@@ -1465,7 +1463,7 @@ impl Debug for TaskDescription {
         let plan = 
DisplayableExecutionPlan::new(self.plan.as_ref()).indent(false);
         write!(
             f,
-            "TaskDescription[session_id: {},job: {}, stage: {}.{}, partition: 
{} task_id {}, task attempt {}, data cache {}]\n{}",
+            "TaskDescription[session_id: {},job: {}, stage: {}.{}, partition: 
{} task_id {}, task attempt {}]\n{}",
             self.session_id,
             self.partition.job_id,
             self.partition.stage_id,
@@ -1473,7 +1471,6 @@ impl Debug for TaskDescription {
             self.partition.partition_id,
             self.task_id,
             self.task_attempt,
-            self.data_cache,
             plan
         )
     }
diff --git a/ballista/scheduler/src/state/task_manager.rs 
b/ballista/scheduler/src/state/task_manager.rs
index 66714e6c..1c9d8dd6 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -27,8 +27,7 @@ use ballista_core::error::Result;
 
 use crate::cluster::JobState;
 use ballista_core::serde::protobuf::{
-    job_status, JobStatus, KeyValuePair, MultiTaskDefinition, TaskDefinition, 
TaskId,
-    TaskStatus,
+    job_status, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId, 
TaskStatus,
 };
 use ballista_core::serde::scheduler::ExecutorMetadata;
 use ballista_core::serde::BallistaCodec;
@@ -47,7 +46,6 @@ use std::time::Duration;
 use std::time::{SystemTime, UNIX_EPOCH};
 use tokio::sync::RwLock;
 
-use ballista_core::config::BALLISTA_DATA_CACHE_ENABLED;
 use tracing::trace;
 
 type ActiveJobCache = Arc<DashMap<String, JobInfoCache>>;
@@ -496,13 +494,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                 plan_buf
             };
 
-            let mut props = vec![];
-            if task.data_cache {
-                props.push(KeyValuePair {
-                    key: BALLISTA_DATA_CACHE_ENABLED.to_string(),
-                    value: "true".to_string(),
-                });
-            }
+            let props = vec![];
 
             let task_definition = TaskDefinition {
                 task_id: task.task_id as u32,
@@ -596,53 +588,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                     .unwrap()
                     .as_millis() as u64;
 
-                let (tasks_with_data_cache, tasks_without_data_cache): 
(Vec<_>, Vec<_>) =
-                    tasks.into_iter().partition(|task| task.data_cache);
-
                 let mut multi_tasks = vec![];
-                if !tasks_with_data_cache.is_empty() {
-                    let task_ids = tasks_with_data_cache
-                        .into_iter()
-                        .map(|task| TaskId {
-                            task_id: task.task_id as u32,
-                            task_attempt_num: task.task_attempt as u32,
-                            partition_id: task.partition.partition_id as u32,
-                        })
-                        .collect();
-                    multi_tasks.push(MultiTaskDefinition {
-                        task_ids,
-                        job_id: job_id.clone(),
-                        stage_id: stage_id as u32,
-                        stage_attempt_num: stage_attempt_num as u32,
-                        plan: plan.clone(),
-                        session_id: session_id.clone(),
-                        launch_time,
-                        props: vec![KeyValuePair {
-                            key: BALLISTA_DATA_CACHE_ENABLED.to_string(),
-                            value: "true".to_string(),
-                        }],
-                    });
-                }
-                if !tasks_without_data_cache.is_empty() {
-                    let task_ids = tasks_without_data_cache
-                        .into_iter()
-                        .map(|task| TaskId {
-                            task_id: task.task_id as u32,
-                            task_attempt_num: task.task_attempt as u32,
-                            partition_id: task.partition.partition_id as u32,
-                        })
-                        .collect();
-                    multi_tasks.push(MultiTaskDefinition {
-                        task_ids,
-                        job_id,
-                        stage_id: stage_id as u32,
-                        stage_attempt_num: stage_attempt_num as u32,
-                        plan,
-                        session_id,
-                        launch_time,
-                        props: vec![],
-                    });
-                }
+
+                let task_ids = tasks
+                    .into_iter()
+                    .map(|task| TaskId {
+                        task_id: task.task_id as u32,
+                        task_attempt_num: task.task_attempt as u32,
+                        partition_id: task.partition.partition_id as u32,
+                    })
+                    .collect();
+                multi_tasks.push(MultiTaskDefinition {
+                    task_ids,
+                    job_id,
+                    stage_id: stage_id as u32,
+                    stage_attempt_num: stage_attempt_num as u32,
+                    plan,
+                    session_id,
+                    launch_time,
+                    props: vec![],
+                });
 
                 Ok(multi_tasks)
             } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to