This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 99c9e11e Remove cache functionality (#1076)
99c9e11e is described below
commit 99c9e11e1c55ae34c57b780dd86ded8e3457b1f8
Author: Marko Milenković <[email protected]>
AuthorDate: Sat Oct 12 15:51:57 2024 +0100
Remove cache functionality (#1076)
As it may be useful I'd argue that it should not be
part of core ballista. We see in the community
caches build on top of object_store.
If functionality like this is needed it should be
implemented as scheduler-policy or similar.
Relates to: #1066 & #1067
---
Cargo.toml | 2 +-
ballista/cache/Cargo.toml | 38 --
ballista/cache/README.md | 22 -
ballista/cache/src/backend/mod.rs | 73 ---
ballista/cache/src/backend/policy/lru/lru_cache.rs | 337 ------------
ballista/cache/src/backend/policy/lru/mod.rs | 111 ----
ballista/cache/src/backend/policy/mod.rs | 61 ---
ballista/cache/src/lib.rs | 54 --
ballista/cache/src/listener/cache_policy.rs | 133 -----
ballista/cache/src/listener/loading_cache.rs | 197 -------
ballista/cache/src/listener/mod.rs | 19 -
.../src/loading_cache/cancellation_safe_future.rs | 179 -------
ballista/cache/src/loading_cache/driver.rs | 573 ---------------------
ballista/cache/src/loading_cache/loader.rs | 52 --
ballista/cache/src/loading_cache/mod.rs | 113 ----
ballista/cache/src/metrics/loading_cache.rs | 292 -----------
ballista/cache/src/metrics/mod.rs | 18 -
ballista/core/Cargo.toml | 1 -
ballista/core/src/cache_layer/medium/local_disk.rs | 69 ---
.../core/src/cache_layer/medium/local_memory.rs | 73 ---
ballista/core/src/cache_layer/medium/mod.rs | 42 --
ballista/core/src/cache_layer/mod.rs | 128 -----
ballista/core/src/cache_layer/object_store/file.rs | 267 ----------
ballista/core/src/cache_layer/object_store/mod.rs | 168 ------
ballista/core/src/cache_layer/policy/file.rs | 302 -----------
ballista/core/src/cache_layer/policy/mod.rs | 18 -
ballista/core/src/config.rs | 2 -
ballista/core/src/lib.rs | 2 -
ballista/core/src/object_store_registry/cache.rs | 86 ----
ballista/core/src/object_store_registry/mod.rs | 3 -
ballista/executor/src/execution_loop.rs | 2 +-
ballista/executor/src/executor.rs | 20 +-
ballista/executor/src/executor_process.rs | 44 --
ballista/executor/src/executor_server.rs | 14 +-
ballista/executor/src/standalone.rs | 1 -
ballista/scheduler/src/cluster/mod.rs | 4 -
ballista/scheduler/src/state/execution_graph.rs | 5 +-
ballista/scheduler/src/state/task_manager.rs | 77 +--
38 files changed, 29 insertions(+), 3573 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index ea1c8321..be204446 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,7 +17,7 @@
[workspace]
exclude = ["python"]
-members = ["ballista-cli", "ballista/cache", "ballista/client",
"ballista/core", "ballista/executor", "ballista/scheduler", "benchmarks",
"examples"]
+members = ["ballista-cli", "ballista/client", "ballista/core",
"ballista/executor", "ballista/scheduler", "benchmarks", "examples"]
resolver = "2"
[workspace.dependencies]
diff --git a/ballista/cache/Cargo.toml b/ballista/cache/Cargo.toml
deleted file mode 100644
index 78accd29..00000000
--- a/ballista/cache/Cargo.toml
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[package]
-name = "ballista-cache"
-description = "Ballista Cache"
-license = "Apache-2.0"
-homepage = "https://github.com/apache/arrow-ballista"
-repository = "https://github.com/apache/arrow-ballista"
-readme = "README.md"
-authors = ["Apache DataFusion <[email protected]>"]
-version = "0.12.0"
-edition = "2021"
-
-# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
-async-trait = "0.1.64"
-futures = "0.3"
-hashbrown = "0.14"
-hashlink = "0.8.4"
-log = "0.4"
-parking_lot = "0.12"
-tokio = { version = "1.25", features = ["macros", "parking_lot",
"rt-multi-thread", "sync", "time"] }
diff --git a/ballista/cache/README.md b/ballista/cache/README.md
deleted file mode 100644
index 4cce0d2b..00000000
--- a/ballista/cache/README.md
+++ /dev/null
@@ -1,22 +0,0 @@
-<!---
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-# Ballista Cache Library
-
-This crate contains the Ballista cache library which is used as a dependency
from other Ballista crates.
diff --git a/ballista/cache/src/backend/mod.rs
b/ballista/cache/src/backend/mod.rs
deleted file mode 100644
index f24119e8..00000000
--- a/ballista/cache/src/backend/mod.rs
+++ /dev/null
@@ -1,73 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod policy;
-
-use crate::backend::policy::CachePolicy;
-use std::fmt::Debug;
-use std::hash::Hash;
-
-/// Backend to keep and manage stored entries.
-///
-/// A backend might remove entries at any point, e.g. due to memory pressure
or expiration.
-#[derive(Debug)]
-pub struct CacheBackend<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- policy: Box<dyn CachePolicy<K = K, V = V>>,
-}
-
-impl<K, V> CacheBackend<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- pub fn new(policy: impl CachePolicy<K = K, V = V>) -> Self {
- Self {
- policy: Box::new(policy),
- }
- }
-
- /// Get value for given key if it exists.
- pub fn get(&mut self, k: &K) -> Option<V> {
- self.policy.get(k)
- }
-
- /// Peek value for given key if it exists.
- ///
- /// In contrast to [`get`](Self::get) this will only return a value if
there is a stored value.
- /// This will not change the cache contents.
- pub fn peek(&mut self, k: &K) -> Option<V> {
- self.policy.peek(k)
- }
-
- /// Put value for given key.
- ///
- /// If a key already exists, its old value will be returned.
- pub fn put(&mut self, k: K, v: V) -> Option<V> {
- self.policy.put(k, v).0
- }
-
- /// Remove value for given key.
- ///
- /// If a key does not exist, none will be returned.
- pub fn remove(&mut self, k: &K) -> Option<V> {
- self.policy.remove(k)
- }
-}
diff --git a/ballista/cache/src/backend/policy/lru/lru_cache.rs
b/ballista/cache/src/backend/policy/lru/lru_cache.rs
deleted file mode 100644
index 284a4a28..00000000
--- a/ballista/cache/src/backend/policy/lru/lru_cache.rs
+++ /dev/null
@@ -1,337 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::backend::policy::lru::ResourceCounter;
-use crate::backend::policy::{lru::LruCachePolicy, CachePolicy,
CachePolicyPutResult};
-use hashbrown::hash_map::DefaultHashBuilder;
-use hashlink::linked_hash_map::{self, IntoIter, Iter, IterMut, LinkedHashMap};
-use std::any::Any;
-use std::fmt;
-use std::fmt::Debug;
-use std::hash::{BuildHasher, Hash};
-
-pub struct LruCache<K, V, H = DefaultHashBuilder>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- map: LinkedHashMap<K, V, H>,
- resource_counter: Box<dyn ResourceCounter<K = K, V = V>>,
-}
-
-impl<K, V> LruCache<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- pub fn with_resource_counter<R>(resource_counter: R) -> Self
- where
- R: ResourceCounter<K = K, V = V>,
- {
- LruCache {
- map: LinkedHashMap::new(),
- resource_counter: Box::new(resource_counter),
- }
- }
-}
-
-impl<K, V, H> LruCache<K, V, H>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- pub fn with_resource_counter_and_hasher<R>(
- resource_counter: R,
- hash_builder: H,
- ) -> Self
- where
- R: ResourceCounter<K = K, V = V>,
- {
- LruCache {
- map: LinkedHashMap::with_hasher(hash_builder),
- resource_counter: Box::new(resource_counter),
- }
- }
-
- pub fn len(&self) -> usize {
- self.map.len()
- }
-
- pub fn is_empty(&self) -> bool {
- self.map.is_empty()
- }
-
- pub fn iter(&self) -> Iter<K, V> {
- self.map.iter()
- }
-
- pub fn iter_mut(&mut self) -> IterMut<K, V> {
- self.map.iter_mut()
- }
-}
-
-impl<K, V, H> LruCachePolicy for LruCache<K, V, H>
-where
- K: 'static + Clone + Debug + Eq + Hash + Ord + Send,
- H: 'static + BuildHasher + Debug + Send,
- V: 'static + Clone + Debug + Send,
-{
- fn get_lru(&mut self, k: &Self::K) -> Option<Self::V> {
- match self.map.raw_entry_mut().from_key(k) {
- linked_hash_map::RawEntryMut::Occupied(mut occupied) => {
- occupied.to_back();
- Some(occupied.into_mut().clone())
- }
- linked_hash_map::RawEntryMut::Vacant(_) => None,
- }
- }
-
- fn put_lru(
- &mut self,
- k: Self::K,
- v: Self::V,
- ) -> CachePolicyPutResult<Self::K, Self::V> {
- let old_val = self.map.insert(k.clone(), v.clone());
- // Consume resources for (k, v)
- self.resource_counter.consume(&k, &v);
- // Restore resources for old (k, old_val)
- if let Some(old_val) = &old_val {
- self.resource_counter.restore(&k, old_val);
- }
-
- let mut popped_entries = vec![];
- while self.resource_counter.exceed_capacity() {
- if let Some(entry) = self.pop_lru() {
- popped_entries.push(entry);
- }
- }
- (old_val, popped_entries)
- }
-
- fn pop_lru(&mut self) -> Option<(Self::K, Self::V)> {
- if let Some(entry) = self.map.pop_front() {
- self.resource_counter.restore(&entry.0, &entry.1);
- Some(entry)
- } else {
- None
- }
- }
-}
-
-impl<K, V, H> CachePolicy for LruCache<K, V, H>
-where
- K: 'static + Clone + Debug + Eq + Hash + Ord + Send,
- H: 'static + BuildHasher + Debug + Send,
- V: 'static + Clone + Debug + Send,
-{
- type K = K;
- type V = V;
-
- fn get(&mut self, k: &Self::K) -> Option<Self::V> {
- self.get_lru(k)
- }
-
- fn peek(&mut self, k: &Self::K) -> Option<Self::V> {
- self.map.get(k).cloned()
- }
-
- fn put(&mut self, k: Self::K, v: Self::V) -> CachePolicyPutResult<Self::K,
Self::V> {
- self.put_lru(k, v)
- }
-
- fn remove(&mut self, k: &Self::K) -> Option<Self::V> {
- if let Some(v) = self.map.remove(k) {
- self.resource_counter.restore(k, &v);
- Some(v)
- } else {
- None
- }
- }
-
- fn pop(&mut self) -> Option<(Self::K, Self::V)> {
- self.pop_lru()
- }
-
- fn as_any(&self) -> &dyn Any {
- self
- }
-}
-
-impl<K, V, H> IntoIterator for LruCache<K, V, H>
-where
- K: 'static + Clone + Debug + Eq + Hash + Ord + Send,
- V: 'static + Clone + Debug + Send,
-{
- type Item = (K, V);
- type IntoIter = IntoIter<K, V>;
-
- fn into_iter(self) -> IntoIter<K, V> {
- self.map.into_iter()
- }
-}
-
-impl<'a, K, V, H> IntoIterator for &'a LruCache<K, V, H>
-where
- K: 'static + Clone + Debug + Eq + Hash + Ord + Send,
- V: 'static + Clone + Debug + Send,
-{
- type Item = (&'a K, &'a V);
- type IntoIter = Iter<'a, K, V>;
-
- fn into_iter(self) -> Iter<'a, K, V> {
- self.iter()
- }
-}
-
-impl<'a, K, V, H> IntoIterator for &'a mut LruCache<K, V, H>
-where
- K: 'static + Clone + Debug + Eq + Hash + Ord + Send,
- V: 'static + Clone + Debug + Send,
-{
- type Item = (&'a K, &'a mut V);
- type IntoIter = IterMut<'a, K, V>;
-
- fn into_iter(self) -> IterMut<'a, K, V> {
- self.iter_mut()
- }
-}
-
-impl<K, V, H> Debug for LruCache<K, V, H>
-where
- K: 'static + Clone + Debug + Eq + Hash + Ord + Send,
- V: 'static + Clone + Debug + Send,
-{
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.debug_map().entries(self.iter().rev()).finish()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use crate::backend::policy::lru::lru_cache::LruCache;
- use crate::backend::policy::lru::{DefaultResourceCounter, ResourceCounter};
- use crate::backend::policy::CachePolicy;
- use hashbrown::HashMap;
-
- #[test]
- fn test_cache_with_lru_policy() {
- let mut cache =
LruCache::with_resource_counter(DefaultResourceCounter::new(3));
-
- cache.put("1".to_string(), "file1".to_string());
- cache.put("2".to_string(), "file2".to_string());
- cache.put("3".to_string(), "file3".to_string());
- assert_eq!(3, cache.len());
-
- cache.put("4".to_string(), "file4".to_string());
- assert_eq!(3, cache.len());
- assert!(cache.peek(&"1".to_string()).is_none());
-
- assert!(cache.peek(&"2".to_string()).is_some());
- let mut iter = cache.iter();
- assert_eq!("2", iter.next().unwrap().0);
- assert_eq!("3", iter.next().unwrap().0);
- assert_eq!("4", iter.next().unwrap().0);
-
- assert!(cache.get(&"2".to_string()).is_some());
- let mut iter = cache.iter();
- assert_eq!("3", iter.next().unwrap().0);
- assert_eq!("4", iter.next().unwrap().0);
- assert_eq!("2", iter.next().unwrap().0);
-
- assert_eq!(Some("file4".to_string()), cache.remove(&"4".to_string()));
-
- assert_eq!("3".to_string(), cache.pop().unwrap().0);
- assert_eq!("2".to_string(), cache.pop().unwrap().0);
- assert!(cache.pop().is_none());
- }
-
- #[test]
- fn test_cache_with_size_resource_counter() {
- let mut cache =
-
LruCache::with_resource_counter(get_test_size_resource_counter(50));
-
- cache.put("1".to_string(), "file1".to_string());
- cache.put("2".to_string(), "file2".to_string());
- cache.put("3".to_string(), "file3".to_string());
- assert_eq!(3, cache.len());
-
- cache.put("4".to_string(), "file4".to_string());
- assert_eq!(2, cache.len());
- assert!(cache.peek(&"1".to_string()).is_none());
- assert!(cache.peek(&"2".to_string()).is_none());
-
- assert!(cache.peek(&"3".to_string()).is_some());
- let mut iter = cache.iter();
- assert_eq!("3", iter.next().unwrap().0);
- assert_eq!("4", iter.next().unwrap().0);
-
- assert!(cache.get(&"3".to_string()).is_some());
- let mut iter = cache.iter();
- assert_eq!("4", iter.next().unwrap().0);
- assert_eq!("3", iter.next().unwrap().0);
-
- cache.put("5".to_string(), "file5".to_string());
- cache.put("3".to_string(), "file3-bak".to_string());
- cache.put("1".to_string(), "file1".to_string());
- let mut iter = cache.iter();
- assert_eq!("3", iter.next().unwrap().0);
- assert_eq!("1", iter.next().unwrap().0);
- assert!(iter.next().is_none());
- }
-
- fn get_test_size_resource_counter(max_size: usize) ->
TestSizeResourceCounter {
- let mut size_map = HashMap::new();
- size_map.insert(("1".to_string(), "file1".to_string()), 10);
- size_map.insert(("2".to_string(), "file2".to_string()), 20);
- size_map.insert(("3".to_string(), "file3".to_string()), 15);
- size_map.insert(("3".to_string(), "file3-bak".to_string()), 30);
- size_map.insert(("4".to_string(), "file4".to_string()), 35);
- size_map.insert(("5".to_string(), "file5".to_string()), 25);
-
- TestSizeResourceCounter {
- size_map,
- max_size,
- current_size: 0,
- }
- }
-
- #[derive(Debug)]
- struct TestSizeResourceCounter {
- size_map: HashMap<(String, String), usize>,
- max_size: usize,
- current_size: usize,
- }
-
- impl ResourceCounter for TestSizeResourceCounter {
- type K = String;
- type V = String;
-
- fn consume(&mut self, k: &Self::K, v: &Self::V) {
- let s = self.size_map.get(&(k.clone(), v.clone())).unwrap();
- self.current_size += s;
- }
-
- fn restore(&mut self, k: &Self::K, v: &Self::V) {
- let s = self.size_map.get(&(k.clone(), v.clone())).unwrap();
- self.current_size -= s;
- }
-
- fn exceed_capacity(&self) -> bool {
- self.current_size > self.max_size
- }
- }
-}
diff --git a/ballista/cache/src/backend/policy/lru/mod.rs
b/ballista/cache/src/backend/policy/lru/mod.rs
deleted file mode 100644
index a9b85569..00000000
--- a/ballista/cache/src/backend/policy/lru/mod.rs
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod lru_cache;
-
-use crate::backend::policy::CachePolicyPutResult;
-use crate::backend::CachePolicy;
-use std::fmt::Debug;
-use std::hash::Hash;
-use std::marker::PhantomData;
-
-pub trait LruCachePolicy: CachePolicy {
- /// Retrieve the value for the given key,
- /// marking it as recently used and moving it to the back of the LRU list.
- fn get_lru(&mut self, k: &Self::K) -> Option<Self::V>;
-
- /// Put value for given key.
- ///
- /// If a key already exists, its old value will be returned.
- ///
- /// If necessary, will remove the value at the front of the LRU list to
make room.
- fn put_lru(
- &mut self,
- k: Self::K,
- v: Self::V,
- ) -> CachePolicyPutResult<Self::K, Self::V>;
-
- /// Remove the least recently used entry and return it.
- ///
- /// If the `LruCache` is empty this will return None.
- fn pop_lru(&mut self) -> Option<(Self::K, Self::V)>;
-}
-
-pub trait ResourceCounter: Debug + Send + 'static {
- /// Resource key.
- type K: Clone + Eq + Hash + Ord + Debug + Send + 'static;
-
- /// Resource value.
- type V: Clone + Debug + Send + 'static;
-
- /// Consume resource for a given key-value pair.
- fn consume(&mut self, k: &Self::K, v: &Self::V);
-
- /// Return resource for a given key-value pair.
- fn restore(&mut self, k: &Self::K, v: &Self::V);
-
- /// Check whether the current used resource exceeds the capacity
- fn exceed_capacity(&self) -> bool;
-}
-
-#[derive(Debug, Clone, Copy)]
-pub struct DefaultResourceCounter<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- max_num: usize,
- current_num: usize,
- _key_marker: PhantomData<K>,
- _value_marker: PhantomData<V>,
-}
-
-impl<K, V> DefaultResourceCounter<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- pub fn new(capacity: usize) -> Self {
- Self {
- max_num: capacity,
- current_num: 0,
- _key_marker: PhantomData,
- _value_marker: PhantomData,
- }
- }
-}
-
-impl<K, V> ResourceCounter for DefaultResourceCounter<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- type K = K;
- type V = V;
-
- fn consume(&mut self, _k: &Self::K, _v: &Self::V) {
- self.current_num += 1;
- }
-
- fn restore(&mut self, _k: &Self::K, _v: &Self::V) {
- self.current_num -= 1;
- }
-
- fn exceed_capacity(&self) -> bool {
- self.current_num > self.max_num
- }
-}
diff --git a/ballista/cache/src/backend/policy/mod.rs
b/ballista/cache/src/backend/policy/mod.rs
deleted file mode 100644
index 26f17137..00000000
--- a/ballista/cache/src/backend/policy/mod.rs
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::any::Any;
-use std::fmt::Debug;
-use std::hash::Hash;
-
-pub mod lru;
-
-pub type CachePolicyPutResult<K, V> = (Option<V>, Vec<(K, V)>);
-
-pub trait CachePolicy: Debug + Send + 'static {
- /// Cache key.
- type K: Clone + Eq + Hash + Ord + Debug + Send + 'static;
-
- /// Cached value.
- type V: Clone + Debug + Send + 'static;
-
- /// Get value for given key if it exists.
- fn get(&mut self, k: &Self::K) -> Option<Self::V>;
-
- /// Peek value for given key if it exists.
- ///
- /// In contrast to [`get`](Self::get) this will only return a value if
there is a stored value.
- /// This will not change the cache entries.
- fn peek(&mut self, k: &Self::K) -> Option<Self::V>;
-
- /// Put value for given key.
- ///
- /// If a key already exists, its old value will be returned.
- ///
- /// At the meanwhile, entries popped due to memory pressure will be
returned
- fn put(&mut self, k: Self::K, v: Self::V) -> CachePolicyPutResult<Self::K,
Self::V>;
-
- /// Remove value for given key.
- ///
- /// If a key does not exist, none will be returned.
- fn remove(&mut self, k: &Self::K) -> Option<Self::V>;
-
- /// Remove an entry from the cache due to memory pressure or expiration.
- ///
- /// If the cache is empty, none will be returned.
- fn pop(&mut self) -> Option<(Self::K, Self::V)>;
-
- /// Return backend as [`Any`] which can be used to downcast to a specific
implementation.
- fn as_any(&self) -> &dyn Any;
-}
diff --git a/ballista/cache/src/lib.rs b/ballista/cache/src/lib.rs
deleted file mode 100644
index 1cba039c..00000000
--- a/ballista/cache/src/lib.rs
+++ /dev/null
@@ -1,54 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::backend::policy::CachePolicy;
-use crate::backend::CacheBackend;
-use crate::listener::{
- cache_policy::CachePolicyWithListener,
loading_cache::LoadingCacheWithListener,
-};
-use crate::loading_cache::{driver::CacheDriver, loader::CacheLoader};
-use std::fmt::Debug;
-use std::hash::Hash;
-use std::sync::Arc;
-
-pub mod backend;
-pub mod listener;
-pub mod loading_cache;
-pub mod metrics;
-
-pub type DefaultLoadingCache<K, V, L> =
LoadingCacheWithListener<CacheDriver<K, V, L>>;
-pub type LoadingCacheMetrics<K, V> = metrics::loading_cache::Metrics<K, V>;
-
-pub fn create_loading_cache_with_metrics<K, V, L>(
- policy: impl CachePolicy<K = K, V = V>,
- loader: Arc<L>,
-) -> (DefaultLoadingCache<K, V, L>, Arc<LoadingCacheMetrics<K, V>>)
-where
- K: Clone + Eq + Hash + Debug + Ord + Send + 'static,
- V: Clone + Debug + Send + 'static,
- L: CacheLoader<K = K, V = V>,
-{
- let metrics = Arc::new(metrics::loading_cache::Metrics::new());
-
- let policy_with_metrics = CachePolicyWithListener::new(policy,
vec![metrics.clone()]);
- let cache_backend = CacheBackend::new(policy_with_metrics);
- let loading_cache = CacheDriver::new(cache_backend, loader);
- let loading_cache_with_metrics =
- LoadingCacheWithListener::new(loading_cache, vec![metrics.clone()]);
-
- (loading_cache_with_metrics, metrics)
-}
diff --git a/ballista/cache/src/listener/cache_policy.rs
b/ballista/cache/src/listener/cache_policy.rs
deleted file mode 100644
index 83917ea8..00000000
--- a/ballista/cache/src/listener/cache_policy.rs
+++ /dev/null
@@ -1,133 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::backend::policy::{CachePolicy, CachePolicyPutResult};
-use std::any::Any;
-use std::fmt::Debug;
-use std::hash::Hash;
-use std::sync::Arc;
-
-pub trait CachePolicyListener: Debug + Send + Sync + 'static {
- /// Cache key.
- type K: Clone + Eq + Hash + Debug + Ord + Send + 'static;
-
- /// Cache value.
- type V: Clone + Debug + Send + 'static;
-
- fn listen_on_get(&self, k: Self::K, v: Option<Self::V>);
-
- fn listen_on_peek(&self, k: Self::K, v: Option<Self::V>);
-
- fn listen_on_put(&self, k: Self::K, v: Self::V, old_v: Option<Self::V>);
-
- fn listen_on_remove(&self, k: Self::K, v: Option<Self::V>);
-
- fn listen_on_pop(&self, entry: (Self::K, Self::V));
-}
-
-#[derive(Debug)]
-pub struct CachePolicyWithListener<P>
-where
- P: CachePolicy,
-{
- inner: P,
- listeners: Vec<Arc<dyn CachePolicyListener<K = P::K, V = P::V>>>,
-}
-
-impl<P> CachePolicyWithListener<P>
-where
- P: CachePolicy,
-{
- pub fn new(
- inner: P,
- listeners: Vec<Arc<dyn CachePolicyListener<K = P::K, V = P::V>>>,
- ) -> Self {
- Self { inner, listeners }
- }
-}
-
-impl<P> CachePolicy for CachePolicyWithListener<P>
-where
- P: CachePolicy,
-{
- type K = P::K;
- type V = P::V;
-
- fn get(&mut self, k: &Self::K) -> Option<Self::V> {
- let v = self.inner.get(k);
-
- // For listeners
- self.listeners
- .iter()
- .for_each(|listener| listener.listen_on_get(k.clone(),
v.as_ref().cloned()));
-
- v
- }
-
- fn peek(&mut self, k: &Self::K) -> Option<Self::V> {
- let v = self.inner.peek(k);
-
- // For listeners
- self.listeners
- .iter()
- .for_each(|listener| listener.listen_on_peek(k.clone(),
v.as_ref().cloned()));
-
- v
- }
-
- fn put(&mut self, k: Self::K, v: Self::V) -> CachePolicyPutResult<Self::K,
Self::V> {
- let ret = self.inner.put(k.clone(), v.clone());
-
- // For listeners
- self.listeners.iter().for_each(|listener| {
- listener.listen_on_put(k.clone(), v.clone(),
ret.0.as_ref().cloned());
- ret.1
- .iter()
- .for_each(|entry| listener.listen_on_pop(entry.clone()));
- });
-
- ret
- }
-
- fn remove(&mut self, k: &Self::K) -> Option<Self::V> {
- let v = self.inner.remove(k);
-
- // For listeners
- self.listeners.iter().for_each(|listener| {
- listener.listen_on_remove(k.clone(), v.as_ref().cloned())
- });
-
- v
- }
-
- fn pop(&mut self) -> Option<(Self::K, Self::V)> {
- let entry = self.inner.pop();
-
- // For listeners
- if let Some(entry) = &entry {
- self.listeners
- .iter()
- .for_each(|listener| listener.listen_on_pop(entry.clone()));
- }
-
- entry
- }
-
- fn as_any(&self) -> &dyn Any {
- self
- }
-}
diff --git a/ballista/cache/src/listener/loading_cache.rs
b/ballista/cache/src/listener/loading_cache.rs
deleted file mode 100644
index 09f61c3b..00000000
--- a/ballista/cache/src/listener/loading_cache.rs
+++ /dev/null
@@ -1,197 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::loading_cache::{CacheGetStatus, LoadingCache};
-use async_trait::async_trait;
-use std::fmt::Debug;
-use std::hash::Hash;
-use std::sync::Arc;
-
-pub trait LoadingCacheListener: Debug + Send + Sync + 'static {
- /// Cache key.
- type K: Clone + Eq + Hash + Debug + Ord + Send + 'static;
-
- /// Cache value.
- type V: Clone + Debug + Send + 'static;
-
- fn listen_on_get_if_present(&self, k: Self::K, v: Option<Self::V>);
-
- fn listen_on_get(&self, k: Self::K, v: Self::V, status: CacheGetStatus);
-
- fn listen_on_put(&self, k: Self::K, v: Self::V);
-
- fn listen_on_invalidate(&self, k: Self::K);
-
- fn listen_on_get_cancelling(&self, k: Self::K);
-}
-
-#[derive(Debug)]
-pub struct LoadingCacheWithListener<L>
-where
- L: LoadingCache,
-{
- inner: L,
- listeners: Vec<Arc<dyn LoadingCacheListener<K = L::K, V = L::V>>>,
-}
-
-impl<L> LoadingCacheWithListener<L>
-where
- L: LoadingCache,
-{
- pub fn new(
- inner: L,
- listeners: Vec<Arc<dyn LoadingCacheListener<K = L::K, V = L::V>>>,
- ) -> Self {
- Self { inner, listeners }
- }
-}
-
-#[async_trait]
-impl<L> LoadingCache for LoadingCacheWithListener<L>
-where
- L: LoadingCache,
-{
- type K = L::K;
- type V = L::V;
- type GetExtra = L::GetExtra;
-
- fn get_if_present(&self, k: Self::K) -> Option<Self::V> {
- let v = self.inner.get_if_present(k.clone());
- self.listen_on_get_if_present(k, v.as_ref().cloned());
- v
- }
-
- async fn get_with_status(
- &self,
- k: Self::K,
- extra: Self::GetExtra,
- ) -> (Self::V, CacheGetStatus) {
- let mut set_on_drop = SetGetListenerOnDrop::new(self, k.clone());
- let (v, status) = self.inner.get_with_status(k, extra).await;
- set_on_drop.get_result = Some((v.clone(), status));
- (v, status)
- }
-
- async fn put(&self, k: Self::K, v: Self::V) {
- let k_captured = k.clone();
- let v_captured = v.clone();
- self.inner.put(k_captured, v_captured).await;
- self.listen_on_put(k, v);
- }
-
- fn invalidate(&self, k: Self::K) {
- self.inner.invalidate(k.clone());
- self.listen_on_invalidate(k);
- }
-}
-
-struct SetGetListenerOnDrop<'a, L>
-where
- L: LoadingCache,
-{
- listener: &'a LoadingCacheWithListener<L>,
- key: L::K,
- get_result: Option<(L::V, CacheGetStatus)>,
-}
-
-impl<'a, L> SetGetListenerOnDrop<'a, L>
-where
- L: LoadingCache,
-{
- fn new(listener: &'a LoadingCacheWithListener<L>, key: L::K) -> Self {
- Self {
- listener,
- key,
- get_result: None,
- }
- }
-}
-
-impl<'a, L> Drop for SetGetListenerOnDrop<'a, L>
-where
- L: LoadingCache,
-{
- fn drop(&mut self) {
- if let Some((value, status)) = &self.get_result {
- self.listener
- .listen_on_get(self.key.clone(), value.clone(), *status)
- } else {
- self.listener.listen_on_get_cancelling(self.key.clone());
- }
- }
-}
-
-impl<L> LoadingCacheListener for LoadingCacheWithListener<L>
-where
- L: LoadingCache,
-{
- type K = L::K;
- type V = L::V;
-
- fn listen_on_get_if_present(&self, k: Self::K, v: Option<Self::V>) {
- if self.listeners.len() == 1 {
- self.listeners
- .first()
- .unwrap()
- .listen_on_get_if_present(k, v);
- } else {
- self.listeners.iter().for_each(|listener| {
- listener.listen_on_get_if_present(k.clone(),
v.as_ref().cloned())
- });
- }
- }
-
- fn listen_on_get(&self, k: Self::K, v: Self::V, status: CacheGetStatus) {
- if self.listeners.len() == 1 {
- self.listeners.first().unwrap().listen_on_get(k, v, status);
- } else {
- self.listeners.iter().for_each(|listener| {
- listener.listen_on_get(k.clone(), v.clone(), status)
- });
- }
- }
-
- fn listen_on_put(&self, k: Self::K, v: Self::V) {
- if self.listeners.len() == 1 {
- self.listeners.first().unwrap().listen_on_put(k, v);
- } else {
- self.listeners
- .iter()
- .for_each(|listener| listener.listen_on_put(k.clone(),
v.clone()));
- }
- }
-
- fn listen_on_invalidate(&self, k: Self::K) {
- if self.listeners.len() == 1 {
- self.listeners.first().unwrap().listen_on_invalidate(k);
- } else {
- self.listeners
- .iter()
- .for_each(|listener| listener.listen_on_invalidate(k.clone()));
- }
- }
-
- fn listen_on_get_cancelling(&self, k: Self::K) {
- if self.listeners.len() == 1 {
- self.listeners.first().unwrap().listen_on_get_cancelling(k);
- } else {
- self.listeners
- .iter()
- .for_each(|listener|
listener.listen_on_get_cancelling(k.clone()));
- }
- }
-}
diff --git a/ballista/cache/src/listener/mod.rs
b/ballista/cache/src/listener/mod.rs
deleted file mode 100644
index 670362e1..00000000
--- a/ballista/cache/src/listener/mod.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod cache_policy;
-pub mod loading_cache;
diff --git a/ballista/cache/src/loading_cache/cancellation_safe_future.rs
b/ballista/cache/src/loading_cache/cancellation_safe_future.rs
deleted file mode 100644
index 73017034..00000000
--- a/ballista/cache/src/loading_cache/cancellation_safe_future.rs
+++ /dev/null
@@ -1,179 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::{
- future::Future,
- pin::Pin,
- sync::Arc,
- task::{Context, Poll},
-};
-
-use futures::future::BoxFuture;
-use parking_lot::Mutex;
-use tokio::task::JoinHandle;
-
-/// Wrapper around a future that cannot be cancelled.
-///
-/// When the future is dropped/cancelled, we'll spawn a tokio task to _rescue_
it.
-pub struct CancellationSafeFuture<F>
-where
- F: Future + Send + 'static,
- F::Output: Send,
-{
- /// Mark if the inner future finished. If not, we must spawn a helper task
on drop.
- done: bool,
-
- /// Inner future.
- ///
- /// Wrapped in an `Option` so we can extract it during drop. Inside that
option however we also need a pinned
- /// box because once this wrapper is polled, it will be pinned in memory
-- even during drop. Now the inner
- /// future does not necessarily implement `Unpin`, so we need a heap
allocation to pin it in memory even when we
- /// move it out of this option.
- inner: Option<BoxFuture<'static, F::Output>>,
-
- /// Where to store the join handle on drop.
- receiver: Arc<Mutex<Option<JoinHandle<F::Output>>>>,
-}
-
-impl<F> Drop for CancellationSafeFuture<F>
-where
- F: Future + Send + 'static,
- F::Output: Send,
-{
- fn drop(&mut self) {
- if !self.done {
- // acquire lock BEFORE checking the Arc
- let mut receiver = self.receiver.lock();
- assert!(receiver.is_none());
-
- // The Mutex is owned by the Arc and cannot be moved out of it. So
after we acquired the lock we can safely
- // check if any external party still has access to the receiver
state. If not, we assume there is no
- // interest in this future at all (e.g. during shutdown) and will
NOT spawn it.
- if Arc::strong_count(&self.receiver) > 1 {
- let inner = self.inner.take().expect("Double-drop?");
- let handle = tokio::task::spawn(inner);
- *receiver = Some(handle);
- }
- }
- }
-}
-
-impl<F> CancellationSafeFuture<F>
-where
- F: Future + Send,
- F::Output: Send,
-{
- /// Create new future that is protected from cancellation.
- ///
- /// If [`CancellationSafeFuture`] is cancelled (i.e. dropped) and there is
still some external receiver of the state
- /// left, than we will drive the payload (`f`) to completion. Otherwise
`f` will be cancelled.
- pub fn new(fut: F, receiver: Arc<Mutex<Option<JoinHandle<F::Output>>>>) ->
Self {
- Self {
- done: false,
- inner: Some(Box::pin(fut)),
- receiver,
- }
- }
-}
-
-impl<F> Future for CancellationSafeFuture<F>
-where
- F: Future + Send,
- F::Output: Send,
-{
- type Output = F::Output;
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Self::Output> {
- assert!(!self.done, "Polling future that already returned");
-
- match self.inner.as_mut().expect("not dropped").as_mut().poll(cx) {
- Poll::Ready(res) => {
- self.done = true;
- Poll::Ready(res)
- }
- Poll::Pending => Poll::Pending,
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use std::{
- sync::atomic::{AtomicBool, Ordering},
- time::Duration,
- };
-
- use tokio::sync::Barrier;
-
- use super::*;
-
- #[tokio::test]
- async fn test_happy_path() {
- let done = Arc::new(AtomicBool::new(false));
- let done_captured = Arc::clone(&done);
-
- let receiver = Default::default();
- let fut = CancellationSafeFuture::new(
- async move {
- done_captured.store(true, Ordering::SeqCst);
- },
- receiver,
- );
-
- fut.await;
-
- assert!(done.load(Ordering::SeqCst));
- }
-
- #[tokio::test]
- async fn test_cancel_future() {
- let done = Arc::new(Barrier::new(2));
- let done_captured = Arc::clone(&done);
-
- let receiver = Default::default();
- let fut = CancellationSafeFuture::new(
- async move {
- done_captured.wait().await;
- },
- Arc::clone(&receiver),
- );
-
- drop(fut);
-
- tokio::time::timeout(Duration::from_secs(5), done.wait())
- .await
- .unwrap();
- }
-
- #[tokio::test]
- async fn test_receiver_gone() {
- let done = Arc::new(Barrier::new(2));
- let done_captured = Arc::clone(&done);
-
- let receiver = Default::default();
- let fut = CancellationSafeFuture::new(
- async move {
- done_captured.wait().await;
- },
- receiver,
- );
-
- drop(fut);
-
- assert_eq!(Arc::strong_count(&done), 1);
- }
-}
diff --git a/ballista/cache/src/loading_cache/driver.rs
b/ballista/cache/src/loading_cache/driver.rs
deleted file mode 100644
index 614c6ead..00000000
--- a/ballista/cache/src/loading_cache/driver.rs
+++ /dev/null
@@ -1,573 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Main data structure, see [`CacheDriver`].
-
-use crate::backend::CacheBackend;
-use crate::loading_cache::{
- cancellation_safe_future::CancellationSafeFuture,
- loader::CacheLoader,
- {CacheGetStatus, LoadingCache},
-};
-use async_trait::async_trait;
-use futures::future::{BoxFuture, Shared};
-use futures::{FutureExt, TryFutureExt};
-use log::debug;
-use parking_lot::Mutex;
-use std::collections::HashMap;
-use std::fmt::Debug;
-use std::future::Future;
-use std::hash::Hash;
-use std::sync::Arc;
-use tokio::{
- sync::oneshot::{error::RecvError, Sender},
- task::JoinHandle,
-};
-
-/// Combine a [`CacheBackend`] and a [`Loader`] into a single [`Cache`]
-#[derive(Debug)]
-pub struct CacheDriver<K, V, L>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
- L: CacheLoader<K = K, V = V>,
-{
- state: Arc<Mutex<CacheState<K, V>>>,
- loader: Arc<L>,
-}
-
-impl<K, V, L> CacheDriver<K, V, L>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
- L: CacheLoader<K = K, V = V>,
-{
- /// Create new, empty cache with given loader function.
- pub fn new(backend: CacheBackend<K, V>, loader: Arc<L>) -> Self {
- Self {
- state: Arc::new(Mutex::new(CacheState {
- cached_entries: backend,
- loaders: HashMap::new(),
- next_loader_tag: 0,
- })),
- loader,
- }
- }
-}
-
-#[async_trait]
-impl<K, V, L> LoadingCache for CacheDriver<K, V, L>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
- L: CacheLoader<K = K, V = V>,
-{
- type K = K;
- type V = V;
- type GetExtra = L::Extra;
-
- fn get_if_present(&self, k: Self::K) -> Option<Self::V> {
- self.state.lock().cached_entries.get(&k)
- }
-
- async fn get_with_status(
- &self,
- k: Self::K,
- extra: Self::GetExtra,
- ) -> (Self::V, CacheGetStatus) {
- // place state locking into its own scope so it doesn't leak into the
generator (async
- // function)
- let (fut, receiver, status) = {
- let mut state = self.state.lock();
-
- // check if the entry has already been cached
- if let Some(v) = state.cached_entries.get(&k) {
- return (v, CacheGetStatus::Hit);
- }
-
- // check if there is already a running loader for this key
- if let Some(loader) = state.loaders.get(&k) {
- (
- None,
- loader.recv.clone(),
- CacheGetStatus::MissAlreadyLoading,
- )
- } else {
- // generate unique tag
- let loader_tag = state.next_loader_tag();
-
- // requires new loader
- let (fut, loader) = create_value_loader(
- self.state.clone(),
- self.loader.clone(),
- loader_tag,
- k.clone(),
- extra,
- );
-
- let receiver = loader.recv.clone();
- state.loaders.insert(k, loader);
-
- (Some(fut), receiver, CacheGetStatus::Miss)
- }
- };
-
- // try to run the loader future in this very task context to avoid
spawning tokio tasks (which adds latency and
- // overhead)
- if let Some(fut) = fut {
- fut.await;
- }
-
- let v = retrieve_from_shared(receiver).await;
-
- (v, status)
- }
-
- async fn put(&self, k: Self::K, v: Self::V) {
- let maybe_join_handle = {
- let mut state = self.state.lock();
-
- let maybe_recv = if let Some(loader) = state.loaders.remove(&k) {
- // it's OK when the receiver side is gone (likely panicked)
- loader.set.send(v.clone()).ok();
-
- // When we side-load data into the running task, the task does
NOT modify the
- // backend, so we have to do that. The reason for not letting
the task feed the
- // side-loaded data back into `cached_entries` is that we
would need to drop the
- // state lock here before the task could acquire it, leading
to a lock gap.
- Some(loader.recv)
- } else {
- None
- };
-
- state.cached_entries.put(k, v);
-
- maybe_recv
- };
-
- // drive running loader (if any) to completion
- if let Some(recv) = maybe_join_handle {
- // we do not care if the loader died (e.g. due to a panic)
- recv.await.ok();
- }
- }
-
- fn invalidate(&self, k: Self::K) {
- let mut state = self.state.lock();
-
- if state.loaders.remove(&k).is_some() {
- debug!("Running loader for key {:?} is removed", k);
- }
-
- state.cached_entries.remove(&k);
- }
-}
-
-impl<K, V, L> Drop for CacheDriver<K, V, L>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
- L: CacheLoader<K = K, V = V>,
-{
- fn drop(&mut self) {
- for (_k, loader) in self.state.lock().loaders.drain() {
- // It's unlikely that anyone is still using the shared receiver at
this point, because
- // `Cache::get` borrows the `self`. If it is still in use,
aborting the task will
- // cancel the contained future which in turn will drop the sender
of the oneshot
- // channel. The receivers will be notified.
- let handle = loader.join_handle.lock();
- if let Some(handle) = handle.as_ref() {
- handle.abort();
- }
- }
- }
-}
-
-fn create_value_loader<K, V, Extra>(
- state: Arc<Mutex<CacheState<K, V>>>,
- loader: Arc<dyn CacheLoader<K = K, V = V, Extra = Extra>>,
- loader_tag: u64,
- k: K,
- extra: Extra,
-) -> (
- CancellationSafeFuture<impl Future<Output = ()>>,
- ValueLoader<V>,
-)
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
- Extra: Debug + Send + 'static,
-{
- let (tx_main, rx_main) = tokio::sync::oneshot::channel();
- let receiver = rx_main
- .map_ok(|v| Arc::new(Mutex::new(v)))
- .map_err(Arc::new)
- .boxed()
- .shared();
- let (tx_set, rx_set) = tokio::sync::oneshot::channel();
-
- // need to wrap the loader into a `CancellationSafeFuture` so that it
doesn't get cancelled when
- // this very request is cancelled
- let join_handle_receiver = Arc::new(Mutex::new(None));
- let fut = async move {
- let loader_fut = async move {
- let mut submitter = ResultSubmitter::new(state, k.clone(),
loader_tag);
-
- // execute the loader
- // If we panic here then `tx` will be dropped and the receivers
will be
- // notified.
- let v = loader.load(k, extra).await;
-
- // remove "running" state and store result
- let was_running = submitter.submit(v.clone());
-
- if !was_running {
- // value was side-loaded, so we cannot populate `v`. Instead
block this
- // execution branch and wait for `rx_set` to deliver the
side-loaded
- // result.
- loop {
- tokio::task::yield_now().await;
- }
- }
-
- v
- };
-
- // prefer the side-loader
- let v = futures::select_biased! {
- maybe_v = rx_set.fuse() => {
- match maybe_v {
- Ok(v) => {
- // data get side-loaded via `Cache::set`. In this
case, we do
- // NOT modify the state because there would be a
lock-gap. The
- // `set` function will do that for us instead.
- v
- }
- Err(_) => {
- // sender side is gone, very likely the cache is
shutting down
- debug!(
- "Sender for side-loading data into running loader
gone.",
- );
- return;
- }
- }
- }
- v = loader_fut.fuse() => v,
- };
-
- // broadcast result
- // It's OK if the receiver side is gone. This might happen during
shutdown
- tx_main.send(v).ok();
- };
- let fut = CancellationSafeFuture::new(fut,
Arc::clone(&join_handle_receiver));
-
- (
- fut,
- ValueLoader {
- recv: receiver,
- set: tx_set,
- join_handle: join_handle_receiver,
- tag: loader_tag,
- },
- )
-}
-
-/// Inner cache state that is usually guarded by a lock.
-///
-/// The state parts must be updated in a consistent manner, i.e. while using
the same lock guard.
-#[derive(Debug)]
-struct CacheState<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- /// Cached entries (i.e. queries completed).
- cached_entries: CacheBackend<K, V>,
-
- /// Currently value loaders indexed by cache key.
- loaders: HashMap<K, ValueLoader<V>>,
-
- /// Tag used for the next value loader to distinguish loaders for the same
key
- /// (e.g. when starting, side-loading, starting again)
- next_loader_tag: u64,
-}
-
-impl<K, V> CacheState<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- /// To avoid overflow issue, it will begin from 0. It will rarely happen
that
- /// two value loaders share the same key and tag while for different
purposes
- #[inline]
- fn next_loader_tag(&mut self) -> u64 {
- let ret = self.next_loader_tag;
- if self.next_loader_tag != u64::MAX {
- self.next_loader_tag += 1;
- } else {
- self.next_loader_tag = 0;
- }
- ret
- }
-}
-
-/// State for coordinating the execution of a single value loader.
-#[derive(Debug)]
-struct ValueLoader<V> {
- /// A receiver that can await the result.
- recv: SharedReceiver<V>,
-
- /// A sender that enables setting entries while the query is running.
- set: Sender<V>,
-
- /// A handle for the task that is currently loading the value.
- ///
- /// The handle can be used to abort the loading, e.g. when dropping the
cache.
- join_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
-
- /// Tag so that loaders for the same key (e.g. when starting,
side-loading, starting again) can
- /// be told apart.
- tag: u64,
-}
-
-/// A [`tokio::sync::oneshot::Receiver`] that can be cloned.
-///
-/// The types are:
-///
-/// - `Arc<Mutex<V>>`: Ensures that we can clone `V` without requiring `V:
Sync`. At the same time
-/// the reference to `V` (i.e. the `Arc`) must be cloneable for `Shared`
-/// - `Arc<RecvError>`: Is required because `RecvError` is not `Clone` but
`Shared` requires that.
-/// - `BoxFuture`: The transformation from `Result<V, RecvError>` to
`Result<Arc<Mutex<V>>,
-/// Arc<RecvError>>` results in a kinda messy type and we wanna erase that.
-/// - `Shared`: Allow the receiver to be cloned and be awaited from multiple
places.
-type SharedReceiver<V> =
- Shared<BoxFuture<'static, Result<Arc<Mutex<V>>, Arc<RecvError>>>>;
-
-/// Retrieve data from shared receiver.
-async fn retrieve_from_shared<V>(receiver: SharedReceiver<V>) -> V
-where
- V: Clone + Send,
-{
- receiver
- .await
- .expect("cache loader panicked, see logs")
- .lock()
- .clone()
-}
-
-/// Helper to submit results of running queries.
-///
-/// Ensures that running loader is removed when dropped (e.g. during panic).
-struct ResultSubmitter<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- state: Arc<Mutex<CacheState<K, V>>>,
- tag: u64,
- k: Option<K>,
- v: Option<V>,
-}
-
-impl<K, V> ResultSubmitter<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- fn new(state: Arc<Mutex<CacheState<K, V>>>, k: K, tag: u64) -> Self {
- Self {
- state,
- tag,
- k: Some(k),
- v: None,
- }
- }
-
- /// Submit value.
- ///
- /// Returns `true` if this very loader was running.
- fn submit(&mut self, v: V) -> bool {
- assert!(self.v.is_none());
- self.v = Some(v);
- self.finalize()
- }
-
- /// Finalize request.
- ///
- /// Returns `true` if this very loader was running.
- fn finalize(&mut self) -> bool {
- let k = self.k.take().expect("finalized twice");
- let mut state = self.state.lock();
-
- match state.loaders.get(&k) {
- Some(loader) if loader.tag == self.tag => {
- state.loaders.remove(&k);
-
- if let Some(v) = self.v.take() {
- // this very loader is in charge of the key, so store in
in the
- // underlying cache
- state.cached_entries.put(k, v);
- }
-
- true
- }
- _ => {
- // This loader is actually not really running any longer but
got
- // shut down, e.g. due to side loading. Do NOT store the
- // generated value in the underlying cache.
-
- false
- }
- }
- }
-}
-
-impl<K, V> Drop for ResultSubmitter<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- fn drop(&mut self) {
- if self.k.is_some() {
- // not finalized yet
- self.finalize();
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
-
- use crate::backend::policy::lru::lru_cache::LruCache;
- use crate::listener::cache_policy::CachePolicyListener;
- use crate::{CacheBackend, CacheDriver, CacheLoader,
CachePolicyWithListener};
-
- use crate::backend::policy::lru::DefaultResourceCounter;
- use crate::loading_cache::LoadingCache;
- use async_trait::async_trait;
- use parking_lot::Mutex;
- use std::sync::mpsc::{channel, Sender};
- use std::sync::Arc;
-
- #[tokio::test]
- async fn test_removal_entries() {
- let cache_policy =
- LruCache::with_resource_counter(DefaultResourceCounter::new(3));
- let loader = TestStringCacheLoader {
- prefix: "file".to_string(),
- };
- let (sender, receiver) = channel::<(String, String)>();
- let listener = Arc::new(EntryRemovalListener::new(sender));
- let policy_with_listener =
- CachePolicyWithListener::new(cache_policy, vec![listener.clone()]);
- let cache_backend = CacheBackend::new(policy_with_listener);
- let loading_cache = CacheDriver::new(cache_backend, Arc::new(loader));
-
- assert_eq!(
- "file1".to_string(),
- loading_cache.get("1".to_string(), ()).await
- );
- assert_eq!(
- "file2".to_string(),
- loading_cache.get("2".to_string(), ()).await
- );
- assert_eq!(
- "file3".to_string(),
- loading_cache.get("3".to_string(), ()).await
- );
- assert_eq!(
- "file4".to_string(),
- loading_cache.get("4".to_string(), ()).await
- );
- assert_eq!(Ok(("1".to_string(), "file1".to_string())),
receiver.recv());
- assert!(loading_cache.get_if_present("1".to_string()).is_none());
-
- loading_cache
- .put("2".to_string(), "file2-bak".to_string())
- .await;
- assert_eq!(
- "file5".to_string(),
- loading_cache.get("5".to_string(), ()).await
- );
- assert_eq!(Ok(("3".to_string(), "file3".to_string())),
receiver.recv());
- assert!(loading_cache.get_if_present("3".to_string()).is_none());
- assert!(loading_cache.get_if_present("2".to_string()).is_some());
-
- loading_cache.invalidate("2".to_string());
- assert_eq!(
- Ok(("2".to_string(), "file2-bak".to_string())),
- receiver.recv()
- );
- assert!(loading_cache.get_if_present("2".to_string()).is_none());
- }
-
- #[derive(Debug)]
- struct EntryRemovalListener {
- sender: Arc<Mutex<Sender<(String, String)>>>,
- }
-
- impl EntryRemovalListener {
- pub fn new(sender: Sender<(String, String)>) -> Self {
- Self {
- sender: Arc::new(Mutex::new(sender)),
- }
- }
- }
-
- impl CachePolicyListener for EntryRemovalListener {
- type K = String;
- type V = String;
-
- fn listen_on_get(&self, _k: Self::K, _v: Option<Self::V>) {
- // Do nothing
- }
-
- fn listen_on_peek(&self, _k: Self::K, _v: Option<Self::V>) {
- // Do nothing
- }
-
- fn listen_on_put(&self, _k: Self::K, _v: Self::V, _old_v:
Option<Self::V>) {
- // Do nothing
- }
-
- fn listen_on_remove(&self, k: Self::K, v: Option<Self::V>) {
- if let Some(v) = v {
- self.sender.lock().send((k, v)).unwrap();
- }
- }
-
- fn listen_on_pop(&self, entry: (Self::K, Self::V)) {
- self.sender.lock().send(entry).unwrap();
- }
- }
-
- #[derive(Debug)]
- struct TestStringCacheLoader {
- prefix: String,
- }
-
- #[async_trait]
- impl CacheLoader for TestStringCacheLoader {
- type K = String;
- type V = String;
- type Extra = ();
-
- async fn load(&self, k: Self::K, _extra: Self::Extra) -> Self::V {
- format!("{}{k}", self.prefix)
- }
- }
-}
diff --git a/ballista/cache/src/loading_cache/loader.rs
b/ballista/cache/src/loading_cache/loader.rs
deleted file mode 100644
index 8987d88b..00000000
--- a/ballista/cache/src/loading_cache/loader.rs
+++ /dev/null
@@ -1,52 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use async_trait::async_trait;
-use std::fmt::Debug;
-use std::hash::Hash;
-
-/// Loader for missing [`Cache`](crate::cache::Cache) entries.
-#[async_trait]
-pub trait CacheLoader: Debug + Send + Sync + 'static {
- /// Cache key.
- type K: Debug + Hash + Send + 'static;
-
- /// Cache value.
- type V: Debug + Send + 'static;
-
- /// Extra data needed when loading a missing entry. Specify `()` if not
needed.
- type Extra: Debug + Send + 'static;
-
- /// Load value for given key, using the extra data if needed.
- async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V;
-}
-
-#[async_trait]
-impl<K, V, Extra> CacheLoader for Box<dyn CacheLoader<K = K, V = V, Extra =
Extra>>
-where
- K: Debug + Hash + Send + 'static,
- V: Debug + Send + 'static,
- Extra: Debug + Send + 'static,
-{
- type K = K;
- type V = V;
- type Extra = Extra;
-
- async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V {
- self.as_ref().load(k, extra).await
- }
-}
diff --git a/ballista/cache/src/loading_cache/mod.rs
b/ballista/cache/src/loading_cache/mod.rs
deleted file mode 100644
index bf6f0360..00000000
--- a/ballista/cache/src/loading_cache/mod.rs
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-/// It's a fork version from the
influxdb_iox::cache_system[https://github.com/influxdata/influxdb_iox].
-/// Later we will propose to influxdb team to make this cache part more
general.
-mod cancellation_safe_future;
-pub mod driver;
-pub mod loader;
-
-use async_trait::async_trait;
-use std::fmt::Debug;
-use std::hash::Hash;
-
-/// High-level loading cache interface.
-///
-/// Cache entries are manually added using get(Key, GetExtra) or put(Key,
Value),
-/// and are stored in the cache until either evicted or manually invalidated.
-///
-/// # Concurrency
-///
-/// Multiple cache requests for different keys can run at the same time. When
data is requested for
-/// the same key, the underlying loader will only be polled once, even when
the requests are made
-/// while the loader is still running.
-///
-/// # Cancellation
-///
-/// Canceling a [`get`](Self::get) request will NOT cancel the underlying
loader. The data will
-/// still be cached.
-#[async_trait]
-pub trait LoadingCache: Debug + Send + Sync + 'static {
- /// Cache key.
- type K: Clone + Eq + Hash + Debug + Ord + Send + 'static;
-
- /// Cache value.
- type V: Clone + Debug + Send + 'static;
-
- /// Extra data that is provided during [`GET`](Self::get) but that is NOT
part of the cache key.
- type GetExtra: Debug + Send + 'static;
-
- /// Get value from cache.
- ///
- /// In contrast to [`get`](Self::get) this will only return a value if
there is a stored value.
- /// This will NOT start a new loading task.
- fn get_if_present(&self, k: Self::K) -> Option<Self::V>;
-
- /// Get value from cache.
- ///
- /// Note that `extra` is only used if the key is missing from the storage
backend
- /// and no value loader for this key is running yet.
- async fn get(&self, k: Self::K, extra: Self::GetExtra) -> Self::V {
- self.get_with_status(k, extra).await.0
- }
-
- /// Get value from cache and the [status](CacheGetStatus).
- ///
- /// Note that `extra` is only used if the key is missing from the storage
backend
- /// and no value loader for this key is running yet.
- async fn get_with_status(
- &self,
- k: Self::K,
- extra: Self::GetExtra,
- ) -> (Self::V, CacheGetStatus);
-
- /// Side-load an entry into the cache. If the cache previously contained a
value associated with key,
- /// the old value is replaced by value.
- ///
- /// This will also complete a currently running loader for this key.
- async fn put(&self, k: Self::K, v: Self::V);
-
- /// Discard any cached value for the key.
- ///
- /// This will also interrupt a currently running loader for this key.
- fn invalidate(&self, k: Self::K);
-}
-
-/// Status of a [`Cache`] [GET](LoadingCache::get_with_status) request.
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum CacheGetStatus {
- /// The requested entry was present in the storage backend.
- Hit,
-
- /// The requested entry was NOT present in the storage backend and there's
no running value loader.
- Miss,
-
- /// The requested entry was NOT present in the storage backend, but there
was already a running value loader for
- /// this particular key.
- MissAlreadyLoading,
-}
-
-impl CacheGetStatus {
- /// Get human and machine readable name.
- pub fn name(&self) -> &'static str {
- match self {
- Self::Hit => "hit",
- Self::Miss => "miss",
- Self::MissAlreadyLoading => "miss_already_loading",
- }
- }
-}
diff --git a/ballista/cache/src/metrics/loading_cache.rs
b/ballista/cache/src/metrics/loading_cache.rs
deleted file mode 100644
index 92a2e4fc..00000000
--- a/ballista/cache/src/metrics/loading_cache.rs
+++ /dev/null
@@ -1,292 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::loading_cache::CacheGetStatus;
-use std::fmt::Debug;
-use std::hash::Hash;
-use std::marker::PhantomData;
-
-use crate::listener::cache_policy::CachePolicyListener;
-use crate::listener::loading_cache::LoadingCacheListener;
-use std::sync::atomic::{AtomicU64, Ordering};
-use std::sync::Arc;
-
-/// Struct containing all the metrics
-#[derive(Debug)]
-pub struct Metrics<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- get_hit_count: U64Counter,
- get_miss_count: U64Counter,
- get_miss_already_loading_count: U64Counter,
- get_cancelled_count: U64Counter,
- put_count: U64Counter,
- eviction_count: U64Counter,
- _key_marker: PhantomData<K>,
- _value_marker: PhantomData<V>,
-}
-
-impl<K, V> Default for Metrics<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl<K, V> Metrics<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- pub fn new() -> Self {
- Self {
- get_hit_count: Default::default(),
- get_miss_count: Default::default(),
- get_miss_already_loading_count: Default::default(),
- get_cancelled_count: Default::default(),
- put_count: Default::default(),
- eviction_count: Default::default(),
- _key_marker: Default::default(),
- _value_marker: Default::default(),
- }
- }
-
- pub fn get_hit_count(&self) -> u64 {
- self.get_hit_count.fetch()
- }
-
- pub fn get_miss_count(&self) -> u64 {
- self.get_miss_count.fetch()
- }
-
- pub fn get_miss_already_loading_count(&self) -> u64 {
- self.get_miss_already_loading_count.fetch()
- }
-
- pub fn get_cancelled_count(&self) -> u64 {
- self.get_cancelled_count.fetch()
- }
-
- pub fn put_count(&self) -> u64 {
- self.put_count.fetch()
- }
-
- pub fn eviction_count(&self) -> u64 {
- self.eviction_count.fetch()
- }
-}
-
-// Since we don't store K and V directly, it will be safe.
-unsafe impl<K, V> Sync for Metrics<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
-}
-
-impl<K, V> LoadingCacheListener for Metrics<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- type K = K;
- type V = V;
-
- fn listen_on_get_if_present(&self, _k: Self::K, v: Option<Self::V>) {
- if v.is_some() {
- &self.get_hit_count
- } else {
- &self.get_miss_count
- }
- .inc(1);
- }
-
- fn listen_on_get(&self, _k: Self::K, _v: Self::V, status: CacheGetStatus) {
- match status {
- CacheGetStatus::Hit => &self.get_hit_count,
-
- CacheGetStatus::Miss => &self.get_miss_count,
-
- CacheGetStatus::MissAlreadyLoading =>
&self.get_miss_already_loading_count,
- }
- .inc(1);
- }
-
- fn listen_on_put(&self, _k: Self::K, _v: Self::V) {
- // Do nothing
- }
-
- fn listen_on_invalidate(&self, _k: Self::K) {
- // Do nothing
- }
-
- fn listen_on_get_cancelling(&self, _k: Self::K) {
- self.get_cancelled_count.inc(1);
- }
-}
-
-impl<K, V> CachePolicyListener for Metrics<K, V>
-where
- K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
- V: Clone + Debug + Send + 'static,
-{
- type K = K;
- type V = V;
-
- fn listen_on_get(&self, _k: Self::K, _v: Option<Self::V>) {
- // Do nothing
- }
-
- fn listen_on_peek(&self, _k: Self::K, _v: Option<Self::V>) {
- // Do nothing
- }
-
- fn listen_on_put(&self, _k: Self::K, _v: Self::V, _old_v: Option<Self::V>)
{
- self.put_count.inc(1);
- }
-
- fn listen_on_remove(&self, _k: Self::K, _v: Option<Self::V>) {
- self.eviction_count.inc(1);
- }
-
- fn listen_on_pop(&self, _entry: (Self::K, Self::V)) {
- self.eviction_count.inc(1);
- }
-}
-
-/// A monotonic counter
-#[derive(Debug, Clone, Default)]
-pub struct U64Counter {
- counter: Arc<AtomicU64>,
-}
-
-impl U64Counter {
- pub fn inc(&self, count: u64) {
- self.counter.fetch_add(count, Ordering::Relaxed);
- }
-
- pub fn fetch(&self) -> u64 {
- self.counter.load(Ordering::Relaxed)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use crate::backend::policy::lru::lru_cache::LruCache;
- use crate::backend::policy::lru::DefaultResourceCounter;
- use crate::create_loading_cache_with_metrics;
- use crate::loading_cache::loader::CacheLoader;
- use crate::loading_cache::LoadingCache;
- use async_trait::async_trait;
- use std::sync::Arc;
-
- #[tokio::test]
- async fn test_metrics() {
- let cache_policy =
- LruCache::with_resource_counter(DefaultResourceCounter::new(3));
- let loader = TestStringCacheLoader {
- prefix: "file".to_string(),
- };
- let (loading_cache, metrics) =
- create_loading_cache_with_metrics(cache_policy, Arc::new(loader));
-
- assert_eq!(
- "file1".to_string(),
- loading_cache.get("1".to_string(), ()).await
- );
- assert_eq!(
- "file2".to_string(),
- loading_cache.get("2".to_string(), ()).await
- );
- assert_eq!(
- "file3".to_string(),
- loading_cache.get("3".to_string(), ()).await
- );
- assert_eq!(3, metrics.get_miss_count());
-
- assert_eq!(
- "file4".to_string(),
- loading_cache.get("4".to_string(), ()).await
- );
- assert_eq!(0, metrics.get_hit_count());
- assert_eq!(4, metrics.get_miss_count());
- assert_eq!(4, metrics.put_count());
- assert_eq!(1, metrics.eviction_count());
-
- assert!(loading_cache.get_if_present("1".to_string()).is_none());
- assert_eq!(0, metrics.get_hit_count());
- assert_eq!(5, metrics.get_miss_count());
- assert_eq!(4, metrics.put_count());
- assert_eq!(1, metrics.eviction_count());
-
- loading_cache
- .put("2".to_string(), "file2-bak".to_string())
- .await;
- assert_eq!(0, metrics.get_hit_count());
- assert_eq!(5, metrics.get_miss_count());
- assert_eq!(5, metrics.put_count());
- assert_eq!(1, metrics.eviction_count());
-
- assert_eq!(
- "file5".to_string(),
- loading_cache.get("5".to_string(), ()).await
- );
- assert_eq!(0, metrics.get_hit_count());
- assert_eq!(6, metrics.get_miss_count());
- assert_eq!(6, metrics.put_count());
- assert_eq!(2, metrics.eviction_count());
-
- assert!(loading_cache.get_if_present("3".to_string()).is_none());
- assert_eq!(0, metrics.get_hit_count());
- assert_eq!(7, metrics.get_miss_count());
- assert_eq!(6, metrics.put_count());
- assert_eq!(2, metrics.eviction_count());
-
- assert!(loading_cache.get_if_present("2".to_string()).is_some());
- assert_eq!(1, metrics.get_hit_count());
- assert_eq!(7, metrics.get_miss_count());
- assert_eq!(6, metrics.put_count());
- assert_eq!(2, metrics.eviction_count());
-
- loading_cache.invalidate("2".to_string());
- assert_eq!(1, metrics.get_hit_count());
- assert_eq!(7, metrics.get_miss_count());
- assert_eq!(6, metrics.put_count());
- assert_eq!(3, metrics.eviction_count());
- }
-
- #[derive(Debug)]
- struct TestStringCacheLoader {
- prefix: String,
- }
-
- #[async_trait]
- impl CacheLoader for TestStringCacheLoader {
- type K = String;
- type V = String;
- type Extra = ();
-
- async fn load(&self, k: Self::K, _extra: Self::Extra) -> Self::V {
- format!("{}{k}", self.prefix)
- }
- }
-}
diff --git a/ballista/cache/src/metrics/mod.rs
b/ballista/cache/src/metrics/mod.rs
deleted file mode 100644
index c1b6bcbb..00000000
--- a/ballista/cache/src/metrics/mod.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod loading_cache;
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index daabe8a2..884b3f60 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -48,7 +48,6 @@ s3 = ["object_store/aws"]
ahash = { version = "0.8", default-features = false }
arrow-flight = { workspace = true }
async-trait = "0.1.41"
-ballista-cache = { path = "../cache", version = "0.12.0" }
bytes = "1.0"
chrono = { version = "0.4", default-features = false }
clap = { workspace = true }
diff --git a/ballista/core/src/cache_layer/medium/local_disk.rs
b/ballista/core/src/cache_layer/medium/local_disk.rs
deleted file mode 100644
index e30d8089..00000000
--- a/ballista/core/src/cache_layer/medium/local_disk.rs
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::medium::CacheMedium;
-use crate::cache_layer::object_store::ObjectStoreWithKey;
-use object_store::local::LocalFileSystem;
-use object_store::path::{Path, DELIMITER};
-use object_store::ObjectStore;
-use std::any::Any;
-use std::fmt::{Display, Formatter};
-use std::sync::Arc;
-
-#[derive(Debug, Clone)]
-pub struct LocalDiskMedium {
- cache_object_store: Arc<LocalFileSystem>,
- root_cache_dir: Path,
-}
-
-impl LocalDiskMedium {
- pub fn new(root_cache_dir: String) -> Self {
- Self {
- cache_object_store: Arc::new(LocalFileSystem::new()),
- root_cache_dir: Path::from(root_cache_dir),
- }
- }
-}
-
-impl Display for LocalDiskMedium {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- write!(f, "Cache medium with local disk({})", self.root_cache_dir)
- }
-}
-
-impl CacheMedium for LocalDiskMedium {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn get_object_store(&self) -> Arc<dyn ObjectStore> {
- self.cache_object_store.clone()
- }
-
- fn get_mapping_location(
- &self,
- source_location: &Path,
- source_object_store: &ObjectStoreWithKey,
- ) -> Path {
- let cache_location = format!(
- "{}{DELIMITER}{}{DELIMITER}{source_location}",
- self.root_cache_dir,
- source_object_store.key(),
- );
- Path::from(cache_location)
- }
-}
diff --git a/ballista/core/src/cache_layer/medium/local_memory.rs
b/ballista/core/src/cache_layer/medium/local_memory.rs
deleted file mode 100644
index 2b89392e..00000000
--- a/ballista/core/src/cache_layer/medium/local_memory.rs
+++ /dev/null
@@ -1,73 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::medium::CacheMedium;
-use crate::cache_layer::object_store::ObjectStoreWithKey;
-use object_store::memory::InMemory;
-use object_store::path::{Path, DELIMITER};
-use object_store::ObjectStore;
-use std::any::Any;
-use std::fmt::{Display, Formatter};
-use std::sync::Arc;
-
-#[derive(Debug, Clone)]
-pub struct LocalMemoryMedium {
- cache_object_store: Arc<InMemory>,
-}
-
-impl LocalMemoryMedium {
- pub fn new() -> Self {
- Self {
- cache_object_store: Arc::new(InMemory::new()),
- }
- }
-}
-
-impl Default for LocalMemoryMedium {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl Display for LocalMemoryMedium {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- write!(f, "Cache medium with local memory")
- }
-}
-
-impl CacheMedium for LocalMemoryMedium {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn get_object_store(&self) -> Arc<dyn ObjectStore> {
- self.cache_object_store.clone()
- }
-
- fn get_mapping_location(
- &self,
- source_location: &Path,
- source_object_store: &ObjectStoreWithKey,
- ) -> Path {
- let cache_location = format!(
- "{}{DELIMITER}{}{DELIMITER}{source_location}",
- "memory",
- source_object_store.key(),
- );
- Path::from(cache_location)
- }
-}
diff --git a/ballista/core/src/cache_layer/medium/mod.rs
b/ballista/core/src/cache_layer/medium/mod.rs
deleted file mode 100644
index f2082e23..00000000
--- a/ballista/core/src/cache_layer/medium/mod.rs
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::object_store::ObjectStoreWithKey;
-use object_store::path::Path;
-use object_store::ObjectStore;
-use std::any::Any;
-use std::fmt::{Debug, Display};
-use std::sync::Arc;
-
-pub mod local_disk;
-pub mod local_memory;
-
-pub trait CacheMedium: Debug + Send + Sync + Display + 'static {
- /// Returns the cache layer policy as [`Any`](std::any::Any) so that it
can be
- /// downcast to a specific implementation.
- fn as_any(&self) -> &dyn Any;
-
- /// Get the ObjectStore for the cache storage
- fn get_object_store(&self) -> Arc<dyn ObjectStore>;
-
- /// Get the mapping location on the cache ObjectStore for the source
location on the source ObjectStore
- fn get_mapping_location(
- &self,
- source_location: &Path,
- source_object_store: &ObjectStoreWithKey,
- ) -> Path;
-}
diff --git a/ballista/core/src/cache_layer/mod.rs
b/ballista/core/src/cache_layer/mod.rs
deleted file mode 100644
index 86e52395..00000000
--- a/ballista/core/src/cache_layer/mod.rs
+++ /dev/null
@@ -1,128 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::medium::local_disk::LocalDiskMedium;
-use crate::cache_layer::medium::local_memory::LocalMemoryMedium;
-use crate::cache_layer::policy::file::FileCacheLayer;
-use std::sync::Arc;
-
-pub mod medium;
-pub mod object_store;
-pub mod policy;
-
-#[derive(Debug, Clone)]
-pub enum CacheLayer {
- /// The local disk will be used as the cache layer medium
- /// and the cache level will be the whole file.
- LocalDiskFile(Arc<FileCacheLayer<LocalDiskMedium>>),
-
- /// The local memory will be used as the cache layer medium
- /// and the cache level will be the whole file.
- LocalMemoryFile(Arc<FileCacheLayer<LocalMemoryMedium>>),
-}
-
-#[cfg(test)]
-mod tests {
- use ballista_cache::loading_cache::LoadingCache;
- use futures::TryStreamExt;
- use object_store::local::LocalFileSystem;
- use object_store::path::Path;
- use object_store::{GetResultPayload, ObjectStore};
- use std::io::Write;
- use std::sync::Arc;
- use tempfile::NamedTempFile;
-
- use crate::cache_layer::medium::local_memory::LocalMemoryMedium;
- use crate::cache_layer::object_store::file::FileCacheObjectStore;
- use crate::cache_layer::object_store::ObjectStoreWithKey;
- use crate::cache_layer::policy::file::FileCacheLayer;
- use crate::error::{BallistaError, Result};
-
- #[tokio::test]
- async fn test_cache_file_to_memory() -> Result<()> {
- let test_data = "test_cache_file_to_memory";
- let test_bytes = test_data.as_bytes();
-
- let mut test_file = NamedTempFile::new()?;
- let source_location = Path::from(test_file.as_ref().to_str().unwrap());
-
- let test_size = test_file.write(test_bytes)?;
- assert_eq!(test_bytes.len(), test_size);
-
- // Check the testing data on the source object store
- let source_object_store = Arc::new(LocalFileSystem::new());
- let source_key = "file";
- let source_object_store_with_key = Arc::new(ObjectStoreWithKey::new(
- source_key.to_string(),
- source_object_store.clone(),
- ));
- let actual_source =
source_object_store.get(&source_location).await.unwrap();
- match actual_source.payload {
- GetResultPayload::File(file, _) => {
- assert_eq!(test_bytes.len(), file.metadata()?.len() as usize);
- }
- _ => {
- return Err(BallistaError::General(
- "File instead of data stream should be
returned".to_string(),
- ))
- }
- }
-
- // Check the testing data on the cache object store
- let cache_medium = LocalMemoryMedium::new();
- let cache_layer = FileCacheLayer::new(1000, 1, cache_medium);
- let cache_meta = cache_layer
- .cache()
- .get(
- source_location.clone(),
- source_object_store_with_key.clone(),
- )
- .await;
- assert_eq!(test_bytes.len(), cache_meta.size);
-
- let cache_object_store = FileCacheObjectStore::new(
- Arc::new(cache_layer),
- source_object_store_with_key.clone(),
- );
- let actual_cache =
cache_object_store.get(&source_location).await.unwrap();
- match actual_cache.payload {
- GetResultPayload::File(_, _) => {
- return Err(BallistaError::General(
- "Data stream instead of file should be
returned".to_string(),
- ))
- }
- GetResultPayload::Stream(s) => {
- let mut buf: Vec<u8> = vec![];
- s.try_fold(&mut buf, |acc, part| async move {
- let mut part: Vec<u8> = part.into();
- acc.append(&mut part);
- Ok(acc)
- })
- .await
- .unwrap();
- let actual_cache_data = String::from_utf8(buf).unwrap();
- assert_eq!(test_data, actual_cache_data);
- }
- }
-
- test_file.close()?;
-
- std::mem::forget(cache_object_store);
-
- Ok(())
- }
-}
diff --git a/ballista/core/src/cache_layer/object_store/file.rs
b/ballista/core/src/cache_layer/object_store/file.rs
deleted file mode 100644
index 8229af1c..00000000
--- a/ballista/core/src/cache_layer/object_store/file.rs
+++ /dev/null
@@ -1,267 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::medium::CacheMedium;
-use crate::cache_layer::object_store::ObjectStoreWithKey;
-use crate::cache_layer::policy::file::FileCacheLayer;
-use crate::error::BallistaError;
-use async_trait::async_trait;
-use ballista_cache::loading_cache::LoadingCache;
-use bytes::Bytes;
-use futures::stream::{self, BoxStream, StreamExt};
-use log::info;
-use object_store::path::Path;
-use object_store::{
- Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore,
- PutMultipartOpts, PutOptions, PutPayload, PutResult,
-};
-use std::fmt::{Debug, Display, Formatter};
-use std::ops::Range;
-use std::sync::Arc;
-
-#[derive(Debug)]
-pub struct FileCacheObjectStore<M>
-where
- M: CacheMedium,
-{
- cache_layer: Arc<FileCacheLayer<M>>,
- inner: Arc<ObjectStoreWithKey>,
-}
-
-impl<M> FileCacheObjectStore<M>
-where
- M: CacheMedium,
-{
- pub fn new(
- cache_layer: Arc<FileCacheLayer<M>>,
- inner: Arc<ObjectStoreWithKey>,
- ) -> Self {
- Self { cache_layer, inner }
- }
-}
-
-impl<M> Display for FileCacheObjectStore<M>
-where
- M: CacheMedium,
-{
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- write!(
- f,
- "Object store {} with file level cache on {}",
- self.inner,
- self.cache_layer.cache_store()
- )
- }
-}
-
-#[async_trait]
-impl<M> ObjectStore for FileCacheObjectStore<M>
-where
- M: CacheMedium,
-{
- async fn put(
- &self,
- _location: &Path,
- _bytes: PutPayload,
- ) -> object_store::Result<PutResult> {
- Err(Error::NotSupported {
- source: Box::new(BallistaError::General(
- "Write path is not supported".to_string(),
- )),
- })
- }
-
- async fn put_opts(
- &self,
- _location: &Path,
- _bytes: PutPayload,
- _opts: PutOptions,
- ) -> object_store::Result<PutResult> {
- Err(Error::NotSupported {
- source: Box::new(BallistaError::General(
- "Write path is not supported".to_string(),
- )),
- })
- }
-
- async fn put_multipart(
- &self,
- _location: &Path,
- ) -> object_store::Result<Box<dyn MultipartUpload>> {
- Err(Error::NotSupported {
- source: Box::new(BallistaError::General(
- "Write path is not supported".to_string(),
- )),
- })
- }
-
- async fn put_multipart_opts(
- &self,
- _location: &Path,
- _opts: PutMultipartOpts,
- ) -> object_store::Result<Box<dyn MultipartUpload>> {
- Err(Error::NotSupported {
- source: Box::new(BallistaError::General(
- "Write path is not supported".to_string(),
- )),
- })
- }
-
- /// If it already exists in cache, use the cached result.
- /// Otherwise, trigger a task to load the data into cache; At the
meanwhile,
- /// get the result from the data source
- async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
- if let Some(cache_object_mata) =
- self.cache_layer.cache().get_if_present(location.clone())
- {
- info!("Data for {} is cached", location);
- let cache_location = &cache_object_mata.location;
- self.cache_layer.cache_store().get(cache_location).await
- } else {
- let io_runtime = self.cache_layer.io_runtime();
- let cache_layer = self.cache_layer.clone();
- let key = location.clone();
- let extra = self.inner.clone();
- io_runtime.spawn(async move {
- info!("Going to cache data for {}", key);
- cache_layer.cache().get(key.clone(), extra).await;
- info!("Data for {} has been cached", key);
- });
- self.inner.get(location).await
- }
- }
-
- async fn get_opts(
- &self,
- location: &Path,
- options: GetOptions,
- ) -> object_store::Result<GetResult> {
- if let Some(cache_object_mata) =
- self.cache_layer.cache().get_if_present(location.clone())
- {
- info!("Data for {} is cached", location);
- let cache_location = &cache_object_mata.location;
- self.cache_layer
- .cache_store()
- .get_opts(cache_location, options)
- .await
- } else {
- let io_runtime = self.cache_layer.io_runtime();
- let cache_layer = self.cache_layer.clone();
- let key = location.clone();
- let extra = self.inner.clone();
- io_runtime.spawn(async move {
- info!("Going to cache data for {}", key);
- cache_layer.cache().get(key.clone(), extra).await;
- info!("Data for {} has been cached", key);
- });
- self.inner.get_opts(location, options).await
- }
- }
-
- /// If it already exists in cache, use the cached result.
- /// Otherwise, trigger a task to load the data into cache; At the
meanwhile,
- /// get the result from the data source
- async fn get_range(
- &self,
- location: &Path,
- range: Range<usize>,
- ) -> object_store::Result<Bytes> {
- if let Some(cache_object_mata) =
- self.cache_layer.cache().get_if_present(location.clone())
- {
- info!("Data for {} is cached", location);
- let cache_location = &cache_object_mata.location;
- self.cache_layer
- .cache_store()
- .get_range(cache_location, range)
- .await
- } else {
- let io_runtime = self.cache_layer.io_runtime();
- let cache_layer = self.cache_layer.clone();
- let key = location.clone();
- let extra = self.inner.clone();
- io_runtime.spawn(async move {
- info!("Going to cache data for {}", key);
- cache_layer.cache().get(key.clone(), extra).await;
- info!("Data for {} has been cached", key);
- });
- self.inner.get_range(location, range).await
- }
- }
-
- /// If it already exists in cache, use the cached result.
- /// Otherwise, get the result from the data source.
- /// It will not trigger the task to load data into cache.
- async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
- if let Some(cache_object_mata) =
- self.cache_layer.cache().get_if_present(location.clone())
- {
- let cache_location = &cache_object_mata.location;
- self.cache_layer.cache_store().head(cache_location).await
- } else {
- self.inner.head(location).await
- }
- }
-
- async fn delete(&self, _location: &Path) -> object_store::Result<()> {
- Err(Error::NotSupported {
- source: Box::new(BallistaError::General(
- "Delete is not supported".to_string(),
- )),
- })
- }
-
- fn list(
- &self,
- _prefix: Option<&Path>,
- ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
- stream::once(async {
- Err(Error::NotSupported {
- source: Box::new(BallistaError::General(
- "List is not supported".to_string(),
- )),
- })
- })
- .boxed()
- }
-
- async fn list_with_delimiter(
- &self,
- _prefix: Option<&Path>,
- ) -> object_store::Result<ListResult> {
- Err(Error::NotSupported {
- source: Box::new(BallistaError::General("List is not
supported".to_string())),
- })
- }
-
- async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()>
{
- Err(Error::NotSupported {
- source: Box::new(BallistaError::General("Copy is not
supported".to_string())),
- })
- }
-
- async fn copy_if_not_exists(
- &self,
- _from: &Path,
- _to: &Path,
- ) -> object_store::Result<()> {
- Err(Error::NotSupported {
- source: Box::new(BallistaError::General("Copy is not
supported".to_string())),
- })
- }
-}
diff --git a/ballista/core/src/cache_layer/object_store/mod.rs
b/ballista/core/src/cache_layer/object_store/mod.rs
deleted file mode 100644
index 71a3464e..00000000
--- a/ballista/core/src/cache_layer/object_store/mod.rs
+++ /dev/null
@@ -1,168 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod file;
-
-use async_trait::async_trait;
-use bytes::Bytes;
-use futures::stream::BoxStream;
-use object_store::path::Path;
-use object_store::{
- GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore,
- PutMultipartOpts, PutOptions, PutPayload, PutResult,
-};
-use std::fmt::{Debug, Display, Formatter};
-use std::ops::Range;
-use std::sync::Arc;
-
-/// An [`ObjectStore`] wrapper with a specific key which is used for
registration in [`ObjectStoreRegistry`].
-///
-/// The [`key`] can be used for the cache path prefix.
-#[derive(Debug)]
-pub struct ObjectStoreWithKey {
- key: String,
- inner: Arc<dyn ObjectStore>,
-}
-
-impl ObjectStoreWithKey {
- pub fn new(key: String, inner: Arc<dyn ObjectStore>) -> Self {
- Self { key, inner }
- }
-
- pub fn key(&self) -> &str {
- &self.key
- }
-}
-
-impl Display for ObjectStoreWithKey {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- write!(
- f,
- "Registered object store {} with key {}",
- self.inner, self.key,
- )
- }
-}
-
-#[async_trait]
-impl ObjectStore for ObjectStoreWithKey {
- async fn put(
- &self,
- location: &Path,
- bytes: PutPayload,
- ) -> object_store::Result<PutResult> {
- self.inner.put(location, bytes).await
- }
-
- async fn put_opts(
- &self,
- location: &Path,
- bytes: PutPayload,
- opts: PutOptions,
- ) -> object_store::Result<PutResult> {
- self.inner.put_opts(location, bytes, opts).await
- }
-
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> object_store::Result<Box<dyn MultipartUpload>> {
- self.inner.put_multipart(location).await
- }
-
- async fn put_multipart_opts(
- &self,
- location: &Path,
- opts: PutMultipartOpts,
- ) -> object_store::Result<Box<dyn MultipartUpload>> {
- self.inner.put_multipart_opts(location, opts).await
- }
-
- async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
- self.inner.get(location).await
- }
-
- async fn get_opts(
- &self,
- location: &Path,
- options: GetOptions,
- ) -> object_store::Result<GetResult> {
- self.inner.get_opts(location, options).await
- }
-
- async fn get_range(
- &self,
- location: &Path,
- range: Range<usize>,
- ) -> object_store::Result<Bytes> {
- self.inner.get_range(location, range).await
- }
-
- async fn get_ranges(
- &self,
- location: &Path,
- ranges: &[Range<usize>],
- ) -> object_store::Result<Vec<Bytes>> {
- self.inner.get_ranges(location, ranges).await
- }
-
- async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
- self.inner.head(location).await
- }
-
- async fn delete(&self, location: &Path) -> object_store::Result<()> {
- self.inner.delete(location).await
- }
-
- fn list(
- &self,
- prefix: Option<&Path>,
- ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
- self.inner.list(prefix)
- }
-
- async fn list_with_delimiter(
- &self,
- prefix: Option<&Path>,
- ) -> object_store::Result<ListResult> {
- self.inner.list_with_delimiter(prefix).await
- }
-
- async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
- self.inner.copy(from, to).await
- }
-
- async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()>
{
- self.inner.rename(from, to).await
- }
-
- async fn copy_if_not_exists(
- &self,
- from: &Path,
- to: &Path,
- ) -> object_store::Result<()> {
- self.inner.copy_if_not_exists(from, to).await
- }
-
- async fn rename_if_not_exists(
- &self,
- from: &Path,
- to: &Path,
- ) -> object_store::Result<()> {
- self.inner.rename_if_not_exists(from, to).await
- }
-}
diff --git a/ballista/core/src/cache_layer/policy/file.rs
b/ballista/core/src/cache_layer/policy/file.rs
deleted file mode 100644
index af4865d9..00000000
--- a/ballista/core/src/cache_layer/policy/file.rs
+++ /dev/null
@@ -1,302 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::medium::CacheMedium;
-use crate::cache_layer::object_store::ObjectStoreWithKey;
-use crate::error::{BallistaError, Result};
-use async_trait::async_trait;
-use ballista_cache::backend::policy::lru::lru_cache::LruCache;
-use ballista_cache::backend::policy::lru::ResourceCounter;
-use ballista_cache::listener::cache_policy::{
- CachePolicyListener, CachePolicyWithListener,
-};
-use ballista_cache::loading_cache::loader::CacheLoader;
-use ballista_cache::{
- create_loading_cache_with_metrics, DefaultLoadingCache,
LoadingCacheMetrics,
-};
-use log::{error, info, warn};
-use object_store::path::Path;
-use object_store::{ObjectMeta, ObjectStore, PutPayload};
-use std::ops::Range;
-use std::sync::Arc;
-use tokio::runtime::Runtime;
-
-type DefaultFileLoadingCache<M> =
- DefaultLoadingCache<Path, ObjectMeta, FileCacheLoader<M>>;
-type FileCacheMetrics = LoadingCacheMetrics<Path, ObjectMeta>;
-
-#[derive(Debug)]
-pub struct FileCacheLayer<M>
-where
- M: CacheMedium,
-{
- cache_store: Arc<dyn ObjectStore>,
- loading_cache: DefaultFileLoadingCache<M>,
- io_runtime: Runtime,
- metrics: Arc<FileCacheMetrics>,
-}
-
-impl<M> FileCacheLayer<M>
-where
- M: CacheMedium,
-{
- pub fn new(capacity: usize, cache_io_concurrency: u32, cache_medium: M) ->
Self {
- let cache_store = cache_medium.get_object_store();
-
- let cache_counter = FileCacheCounter::new(capacity);
- let lru_cache = LruCache::with_resource_counter(cache_counter);
- let file_cache_loader = Arc::new(FileCacheLoader::new(cache_medium));
- let cache_with_removal_listener =
- CachePolicyWithListener::new(lru_cache,
vec![file_cache_loader.clone()]);
- let (loading_cache, metrics) = create_loading_cache_with_metrics(
- cache_with_removal_listener,
- file_cache_loader,
- );
- let io_runtime = tokio::runtime::Builder::new_multi_thread()
- .enable_all()
- .thread_name("loading_cache")
- .worker_threads(cache_io_concurrency as usize)
- .build()
- .expect("Creating tokio runtime");
-
- Self {
- cache_store,
- loading_cache,
- io_runtime,
- metrics,
- }
- }
-
- pub fn cache_store(&self) -> Arc<dyn ObjectStore> {
- self.cache_store.clone()
- }
-
- pub fn cache(&self) -> &DefaultFileLoadingCache<M> {
- &self.loading_cache
- }
-
- pub fn io_runtime(&self) -> &Runtime {
- &self.io_runtime
- }
-
- pub fn metrics(&self) -> &FileCacheMetrics {
- self.metrics.as_ref()
- }
-}
-
-#[derive(Debug)]
-pub struct FileCacheLoader<M>
-where
- M: CacheMedium,
-{
- cache_medium: Arc<M>,
-}
-
-impl<M> FileCacheLoader<M>
-where
- M: CacheMedium,
-{
- fn new(cache_medium: M) -> Self {
- Self {
- cache_medium: Arc::new(cache_medium),
- }
- }
-
- fn remove_object(&self, source_path: Path, object_meta: ObjectMeta) {
- let cache_store = self.cache_medium.get_object_store();
- let location = object_meta.location;
- tokio::runtime::Handle::try_current().unwrap().block_on( async {
- if let Err(e) = cache_store.delete(&location).await {
- error!("Fail to delete file {location} on the cache
ObjectStore for source {source_path} due to {e}");
- }
- });
- }
-}
-
-/// Will return the location of the cached file on the cache object store.
-///
-/// The last_modified of the ObjectMeta will be from the source file, which
will be useful
-/// for checking whether the source file changed or not.
-///
-/// The size will be the one of cached file rather than the one of the source
file in case of changing the data format
-async fn load_object<M>(
- cache_medium: Arc<M>,
- source_location: Path,
- source_store: &ObjectStoreWithKey,
-) -> Result<ObjectMeta>
-where
- M: CacheMedium,
-{
- let source_meta = source_store.head(&source_location).await.map_err(|e| {
- BallistaError::General(format!(
- "Fail to read head info for {source_location} due to {e}"
- ))
- })?;
-
- let cache_store = cache_medium.get_object_store();
- let cache_location =
- cache_medium.get_mapping_location(&source_location, source_store);
-
- // Check whether the cache location exist or not. If exists, delete it
first.
- if cache_store.head(&cache_location).await.is_ok() {
- if let Err(e) = cache_store.delete(&cache_location).await {
- error!(
- "Fail to delete file {cache_location} on the cache ObjectStore
due to {e}"
- );
- }
- }
-
- info!(
- "Going to cache object from {} to {}",
- source_location, cache_location
- );
- let range = Range {
- start: 0,
- end: source_meta.size,
- };
- let data = source_store
- .get_range(&source_location, range)
- .await
- .map_err(|e| {
- BallistaError::General(format!(
- "Fail to get file data from {source_location} due to {e}"
- ))
- })?;
- info!(
- "{} bytes will be cached for {}",
- data.len(),
- source_location
- );
- cache_store
- .put(&cache_location, PutPayload::from_bytes(data))
- .await
- .map_err(|e| {
- BallistaError::General(format!(
- "Fail to write out data to {cache_location} due to {e}"
- ))
- })?;
- info!(
- "Object {} has already been cached to {}",
- source_location, cache_location
- );
-
- let cache_meta = cache_store.head(&cache_location).await.map_err(|e| {
- BallistaError::General(format!(
- "Fail to read head info for {cache_location} due to {e}"
- ))
- })?;
-
- Ok(ObjectMeta {
- location: cache_location,
- last_modified: source_meta.last_modified,
- size: cache_meta.size,
- e_tag: source_meta.e_tag,
- version: None,
- })
-}
-
-#[async_trait]
-impl<M> CacheLoader for FileCacheLoader<M>
-where
- M: CacheMedium,
-{
- type K = Path;
- type V = ObjectMeta;
- type Extra = Arc<ObjectStoreWithKey>;
-
- async fn load(&self, source_location: Self::K, source_store: Self::Extra)
-> Self::V {
- match load_object(self.cache_medium.clone(), source_location,
&source_store).await
- {
- Ok(object_meta) => object_meta,
- Err(e) => panic!("{}", e),
- }
- }
-}
-
-impl<M> CachePolicyListener for FileCacheLoader<M>
-where
- M: CacheMedium,
-{
- type K = Path;
- type V = ObjectMeta;
-
- fn listen_on_get(&self, _k: Self::K, _v: Option<Self::V>) {
- // Do nothing
- }
-
- fn listen_on_peek(&self, _k: Self::K, _v: Option<Self::V>) {
- // Do nothing
- }
-
- fn listen_on_put(&self, _k: Self::K, _v: Self::V, _old_v: Option<Self::V>)
{
- // Do nothing
- }
-
- fn listen_on_remove(&self, k: Self::K, v: Option<Self::V>) {
- if let Some(v) = v {
- self.remove_object(k, v);
- } else {
- warn!("The entry does not exist for key {k}");
- }
- }
-
- fn listen_on_pop(&self, entry: (Self::K, Self::V)) {
- self.remove_object(entry.0, entry.1);
- }
-}
-
-#[derive(Debug, Clone, Copy)]
-pub struct FileCacheCounter {
- /// The maximum data size to be cached
- capacity: usize,
- /// The data size already be cached
- cached_size: usize,
-}
-
-impl FileCacheCounter {
- pub fn new(capacity: usize) -> Self {
- FileCacheCounter {
- capacity,
- cached_size: 0,
- }
- }
-
- pub fn capacity(&self) -> usize {
- self.capacity
- }
-
- pub fn cached_size(&self) -> usize {
- self.cached_size
- }
-}
-
-impl ResourceCounter for FileCacheCounter {
- type K = Path;
- type V = ObjectMeta;
-
- fn consume(&mut self, _k: &Self::K, v: &Self::V) {
- self.cached_size += v.size;
- }
-
- fn restore(&mut self, _k: &Self::K, v: &Self::V) {
- self.cached_size -= v.size;
- }
-
- fn exceed_capacity(&self) -> bool {
- self.cached_size > self.capacity
- }
-}
diff --git a/ballista/core/src/cache_layer/policy/mod.rs
b/ballista/core/src/cache_layer/policy/mod.rs
deleted file mode 100644
index 0cb4c86b..00000000
--- a/ballista/core/src/cache_layer/policy/mod.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod file;
diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index 46424ecf..746a0be9 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -37,8 +37,6 @@ pub const BALLISTA_REPARTITION_AGGREGATIONS: &str =
"ballista.repartition.aggreg
pub const BALLISTA_REPARTITION_WINDOWS: &str = "ballista.repartition.windows";
pub const BALLISTA_PARQUET_PRUNING: &str = "ballista.parquet.pruning";
pub const BALLISTA_COLLECT_STATISTICS: &str = "ballista.collect_statistics";
-/// Indicate whether to enable to data cache for a task
-pub const BALLISTA_DATA_CACHE_ENABLED: &str = "ballista.data_cache.enabled";
pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str =
"ballista.with_information_schema";
/// give a plugin files dir, and then the dynamic library files in this dir
will be load when scheduler state init.
diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs
index 5306e8b9..c52d2ef4 100644
--- a/ballista/core/src/lib.rs
+++ b/ballista/core/src/lib.rs
@@ -22,8 +22,6 @@ pub fn print_version() {
println!("Ballista version: {BALLISTA_VERSION}")
}
-#[cfg(not(windows))]
-pub mod cache_layer;
pub mod client;
pub mod config;
pub mod consistent_hash;
diff --git a/ballista/core/src/object_store_registry/cache.rs
b/ballista/core/src/object_store_registry/cache.rs
deleted file mode 100644
index 621e30a1..00000000
--- a/ballista/core/src/object_store_registry/cache.rs
+++ /dev/null
@@ -1,86 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cache_layer::object_store::file::FileCacheObjectStore;
-use crate::cache_layer::object_store::ObjectStoreWithKey;
-use crate::cache_layer::CacheLayer;
-use crate::object_store_registry::BallistaObjectStoreRegistry;
-use datafusion::datasource::object_store::ObjectStoreRegistry;
-use datafusion::execution::runtime_env::RuntimeConfig;
-use object_store::ObjectStore;
-use std::sync::Arc;
-use url::Url;
-
-/// Get a RuntimeConfig with CachedBasedObjectStoreRegistry
-pub fn with_cache_layer(config: RuntimeConfig, cache_layer: CacheLayer) ->
RuntimeConfig {
- let registry = Arc::new(BallistaObjectStoreRegistry::default());
- let registry = Arc::new(CachedBasedObjectStoreRegistry::new(registry,
cache_layer));
- config.with_object_store_registry(registry)
-}
-
-/// An object store registry wrapped an existing one with a cache layer.
-///
-/// During [`get_store`], after getting the source [`ObjectStore`], based on
the url,
-/// it will firstly be wrapped with a key which will be used as the cache
prefix path.
-/// And then it will be wrapped with the [`cache_layer`].
-#[derive(Debug)]
-pub struct CachedBasedObjectStoreRegistry {
- inner: Arc<dyn ObjectStoreRegistry>,
- cache_layer: CacheLayer,
-}
-
-impl CachedBasedObjectStoreRegistry {
- pub fn new(inner: Arc<dyn ObjectStoreRegistry>, cache_layer: CacheLayer)
-> Self {
- Self { inner, cache_layer }
- }
-}
-
-impl ObjectStoreRegistry for CachedBasedObjectStoreRegistry {
- fn register_store(
- &self,
- url: &Url,
- store: Arc<dyn ObjectStore>,
- ) -> Option<Arc<dyn ObjectStore>> {
- self.inner.register_store(url, store)
- }
-
- fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn
ObjectStore>> {
- let source_object_store = self.inner.get_store(url)?;
- let object_store_with_key = Arc::new(ObjectStoreWithKey::new(
- get_url_key(url),
- source_object_store,
- ));
- Ok(match &self.cache_layer {
- CacheLayer::LocalDiskFile(cache_layer) => Arc::new(
- FileCacheObjectStore::new(cache_layer.clone(),
object_store_with_key),
- ),
- CacheLayer::LocalMemoryFile(cache_layer) => Arc::new(
- FileCacheObjectStore::new(cache_layer.clone(),
object_store_with_key),
- ),
- })
- }
-}
-
-/// Get the key of a url for object store cache prefix path.
-/// The credential info will be removed.
-fn get_url_key(url: &Url) -> String {
- format!(
- "{}://{}",
- url.scheme(),
- &url[url::Position::BeforeHost..url::Position::AfterPort],
- )
-}
diff --git a/ballista/core/src/object_store_registry/mod.rs
b/ballista/core/src/object_store_registry/mod.rs
index be527e71..2e394a7c 100644
--- a/ballista/core/src/object_store_registry/mod.rs
+++ b/ballista/core/src/object_store_registry/mod.rs
@@ -15,9 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-#[cfg(not(windows))]
-pub mod cache;
-
use datafusion::common::DataFusionError;
use datafusion::datasource::object_store::{
DefaultObjectStoreRegistry, ObjectStoreRegistry,
diff --git a/ballista/executor/src/execution_loop.rs
b/ballista/executor/src/execution_loop.rs
index 0c169e2d..591c5c45 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -208,7 +208,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
for window_func in executor.window_functions.clone() {
task_window_functions.insert(window_func.0, window_func.1);
}
- let runtime = executor.get_runtime(false);
+ let runtime = executor.get_runtime();
let session_id = task.session_id.clone();
let task_context = Arc::new(TaskContext::new(
Some(task_identity.clone()),
diff --git a/ballista/executor/src/executor.rs
b/ballista/executor/src/executor.rs
index 4e83b125..8ae8e6aa 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -75,11 +75,6 @@ pub struct Executor {
/// Runtime environment for Executor
runtime: Arc<RuntimeEnv>,
- /// Runtime environment for Executor with data cache.
- /// The difference with [`runtime`] is that it leverages a different
[`object_store_registry`].
- /// And others things are shared with [`runtime`].
- runtime_with_data_cache: Option<Arc<RuntimeEnv>>,
-
/// Collector for runtime execution metrics
pub metrics_collector: Arc<dyn ExecutorMetricsCollector>,
@@ -100,7 +95,6 @@ impl Executor {
metadata: ExecutorRegistration,
work_dir: &str,
runtime: Arc<RuntimeEnv>,
- runtime_with_data_cache: Option<Arc<RuntimeEnv>>,
metrics_collector: Arc<dyn ExecutorMetricsCollector>,
concurrent_tasks: usize,
execution_engine: Option<Arc<dyn ExecutionEngine>>,
@@ -123,7 +117,6 @@ impl Executor {
// TODO: set to default window functions when they are moved to
udwf
window_functions: HashMap::new(),
runtime,
- runtime_with_data_cache,
metrics_collector,
concurrent_tasks,
abort_handles: Default::default(),
@@ -134,16 +127,8 @@ impl Executor {
}
impl Executor {
- pub fn get_runtime(&self, data_cache: bool) -> Arc<RuntimeEnv> {
- if data_cache {
- if let Some(runtime) = self.runtime_with_data_cache.clone() {
- runtime
- } else {
- self.runtime.clone()
- }
- } else {
- self.runtime.clone()
- }
+ pub fn get_runtime(&self) -> Arc<RuntimeEnv> {
+ self.runtime.clone()
}
/// Execute one partition of a query stage and persist the result to disk
in IPC format. On
@@ -363,7 +348,6 @@ mod test {
executor_registration,
&work_dir,
ctx.runtime_env(),
- None,
Arc::new(LoggingMetricsCollector {}),
2,
None,
diff --git a/ballista/executor/src/executor_process.rs
b/ballista/executor/src/executor_process.rs
index f438e2f6..c19f0656 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -40,14 +40,8 @@ use uuid::Uuid;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
-#[cfg(not(windows))]
-use ballista_core::cache_layer::{
- medium::local_disk::LocalDiskMedium, policy::file::FileCacheLayer,
CacheLayer,
-};
use ballista_core::config::{DataCachePolicy, LogRotationPolicy,
TaskSchedulingPolicy};
use ballista_core::error::BallistaError;
-#[cfg(not(windows))]
-use
ballista_core::object_store_registry::cache::CachedBasedObjectStoreRegistry;
use ballista_core::object_store_registry::with_object_store_registry;
use ballista_core::serde::protobuf::executor_resource::Resource;
use ballista_core::serde::protobuf::executor_status::Status;
@@ -195,50 +189,12 @@ pub async fn start_executor_process(opt:
Arc<ExecutorProcessConfig>) -> Result<(
})?)
};
- // Set the object store registry
- #[cfg(not(windows))]
- let runtime_with_data_cache = {
- let cache_dir = opt.cache_dir.clone();
- let cache_capacity = opt.cache_capacity;
- let cache_io_concurrency = opt.cache_io_concurrency;
- let cache_layer =
- opt.data_cache_policy
- .map(|data_cache_policy| match data_cache_policy {
- DataCachePolicy::LocalDiskFile => {
- let cache_dir = cache_dir.unwrap();
- let cache_layer = FileCacheLayer::new(
- cache_capacity as usize,
- cache_io_concurrency,
- LocalDiskMedium::new(cache_dir),
- );
- CacheLayer::LocalDiskFile(Arc::new(cache_layer))
- }
- });
- if let Some(cache_layer) = cache_layer {
- let registry = Arc::new(CachedBasedObjectStoreRegistry::new(
- runtime.object_store_registry.clone(),
- cache_layer,
- ));
- Some(Arc::new(RuntimeEnv {
- memory_pool: runtime.memory_pool.clone(),
- disk_manager: runtime.disk_manager.clone(),
- cache_manager: runtime.cache_manager.clone(),
- object_store_registry: registry,
- }))
- } else {
- None
- }
- };
- #[cfg(windows)]
- let runtime_with_data_cache = { None };
-
let metrics_collector = Arc::new(LoggingMetricsCollector::default());
let executor = Arc::new(Executor::new(
executor_meta,
&work_dir,
runtime,
- runtime_with_data_cache,
metrics_collector,
concurrent_tasks,
opt.execution_engine.clone(),
diff --git a/ballista/executor/src/executor_server.rs
b/ballista/executor/src/executor_server.rs
index 5754334c..6e3d5589 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -28,7 +28,6 @@ use log::{debug, error, info, warn};
use tonic::transport::Channel;
use tonic::{Request, Response, Status};
-use ballista_core::config::BALLISTA_DATA_CACHE_ENABLED;
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::{
executor_grpc_server::{ExecutorGrpc, ExecutorGrpcServer},
@@ -344,10 +343,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
let task_context = {
let task_props = task.props;
- let data_cache = task_props
- .get(BALLISTA_DATA_CACHE_ENABLED)
- .map(|data_cache| data_cache.parse().unwrap_or(false))
- .unwrap_or(false);
let mut config = ConfigOptions::new();
for (k, v) in task_props.iter() {
if let Err(e) = config.set(k, v) {
@@ -357,10 +352,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
let session_config = SessionConfig::from(config);
let function_registry = task.function_registry;
- if data_cache {
- info!("Data cache will be enabled for {}", task_identity);
- }
- let runtime = self.executor.get_runtime(data_cache);
+ let runtime = self.executor.get_runtime();
Arc::new(TaskContext::new(
Some(task_identity.clone()),
@@ -649,7 +641,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorGrpc
scheduler_id: scheduler_id.clone(),
task: get_task_definition(
task,
- self.executor.get_runtime(false),
+ self.executor.get_runtime(),
self.executor.scalar_functions.clone(),
self.executor.aggregate_functions.clone(),
self.executor.window_functions.clone(),
@@ -677,7 +669,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorGrpc
for multi_task in multi_tasks {
let multi_task: Vec<TaskDefinition> = get_task_definition_vec(
multi_task,
- self.executor.get_runtime(false),
+ self.executor.get_runtime(),
self.executor.scalar_functions.clone(),
self.executor.aggregate_functions.clone(),
self.executor.window_functions.clone(),
diff --git a/ballista/executor/src/standalone.rs
b/ballista/executor/src/standalone.rs
index 971af517..38e27713 100644
--- a/ballista/executor/src/standalone.rs
+++ b/ballista/executor/src/standalone.rs
@@ -82,7 +82,6 @@ pub async fn new_standalone_executor<
executor_meta,
&work_dir,
Arc::new(RuntimeEnv::new(config).unwrap()),
- None,
Arc::new(LoggingMetricsCollector::default()),
concurrent_tasks,
None,
diff --git a/ballista/scheduler/src/cluster/mod.rs
b/ballista/scheduler/src/cluster/mod.rs
index 5b9ecedf..81432056 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -371,7 +371,6 @@ pub(crate) async fn bind_task_bias(
stage_attempt_num: running_stage.stage_attempt_num,
task_id,
task_attempt:
running_stage.task_failure_numbers[partition_id],
- data_cache: false,
plan: running_stage.plan.clone(),
};
schedulable_tasks.push((executor_id, task_desc));
@@ -460,7 +459,6 @@ pub(crate) async fn bind_task_round_robin(
stage_attempt_num: running_stage.stage_attempt_num,
task_id,
task_attempt:
running_stage.task_failure_numbers[partition_id],
- data_cache: false,
plan: running_stage.plan.clone(),
};
schedulable_tasks.push((executor_id, task_desc));
@@ -565,7 +563,6 @@ pub(crate) async fn bind_task_consistent_hash(
stage_id: running_stage.stage_id,
partition_id,
};
- let data_cache = tolerance == 0;
let task_desc = TaskDescription {
session_id: session_id.clone(),
partition,
@@ -573,7 +570,6 @@ pub(crate) async fn bind_task_consistent_hash(
task_id,
task_attempt: running_stage.task_failure_numbers
[partition_id],
- data_cache,
plan: running_stage.plan.clone(),
};
schedulable_tasks.push((executor_id, task_desc));
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index e72e3906..9e50742f 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -906,7 +906,6 @@ impl ExecutionGraph {
stage_attempt_num: stage.stage_attempt_num,
task_id,
task_attempt,
- data_cache: false,
plan: stage.plan.clone(),
})
} else {
@@ -1456,7 +1455,6 @@ pub struct TaskDescription {
pub stage_attempt_num: usize,
pub task_id: usize,
pub task_attempt: usize,
- pub data_cache: bool,
pub plan: Arc<dyn ExecutionPlan>,
}
@@ -1465,7 +1463,7 @@ impl Debug for TaskDescription {
let plan =
DisplayableExecutionPlan::new(self.plan.as_ref()).indent(false);
write!(
f,
- "TaskDescription[session_id: {},job: {}, stage: {}.{}, partition:
{} task_id {}, task attempt {}, data cache {}]\n{}",
+ "TaskDescription[session_id: {},job: {}, stage: {}.{}, partition:
{} task_id {}, task attempt {}]\n{}",
self.session_id,
self.partition.job_id,
self.partition.stage_id,
@@ -1473,7 +1471,6 @@ impl Debug for TaskDescription {
self.partition.partition_id,
self.task_id,
self.task_attempt,
- self.data_cache,
plan
)
}
diff --git a/ballista/scheduler/src/state/task_manager.rs
b/ballista/scheduler/src/state/task_manager.rs
index 66714e6c..1c9d8dd6 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -27,8 +27,7 @@ use ballista_core::error::Result;
use crate::cluster::JobState;
use ballista_core::serde::protobuf::{
- job_status, JobStatus, KeyValuePair, MultiTaskDefinition, TaskDefinition,
TaskId,
- TaskStatus,
+ job_status, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId,
TaskStatus,
};
use ballista_core::serde::scheduler::ExecutorMetadata;
use ballista_core::serde::BallistaCodec;
@@ -47,7 +46,6 @@ use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
-use ballista_core::config::BALLISTA_DATA_CACHE_ENABLED;
use tracing::trace;
type ActiveJobCache = Arc<DashMap<String, JobInfoCache>>;
@@ -496,13 +494,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
plan_buf
};
- let mut props = vec![];
- if task.data_cache {
- props.push(KeyValuePair {
- key: BALLISTA_DATA_CACHE_ENABLED.to_string(),
- value: "true".to_string(),
- });
- }
+ let props = vec![];
let task_definition = TaskDefinition {
task_id: task.task_id as u32,
@@ -596,53 +588,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
.unwrap()
.as_millis() as u64;
- let (tasks_with_data_cache, tasks_without_data_cache):
(Vec<_>, Vec<_>) =
- tasks.into_iter().partition(|task| task.data_cache);
-
let mut multi_tasks = vec![];
- if !tasks_with_data_cache.is_empty() {
- let task_ids = tasks_with_data_cache
- .into_iter()
- .map(|task| TaskId {
- task_id: task.task_id as u32,
- task_attempt_num: task.task_attempt as u32,
- partition_id: task.partition.partition_id as u32,
- })
- .collect();
- multi_tasks.push(MultiTaskDefinition {
- task_ids,
- job_id: job_id.clone(),
- stage_id: stage_id as u32,
- stage_attempt_num: stage_attempt_num as u32,
- plan: plan.clone(),
- session_id: session_id.clone(),
- launch_time,
- props: vec![KeyValuePair {
- key: BALLISTA_DATA_CACHE_ENABLED.to_string(),
- value: "true".to_string(),
- }],
- });
- }
- if !tasks_without_data_cache.is_empty() {
- let task_ids = tasks_without_data_cache
- .into_iter()
- .map(|task| TaskId {
- task_id: task.task_id as u32,
- task_attempt_num: task.task_attempt as u32,
- partition_id: task.partition.partition_id as u32,
- })
- .collect();
- multi_tasks.push(MultiTaskDefinition {
- task_ids,
- job_id,
- stage_id: stage_id as u32,
- stage_attempt_num: stage_attempt_num as u32,
- plan,
- session_id,
- launch_time,
- props: vec![],
- });
- }
+
+ let task_ids = tasks
+ .into_iter()
+ .map(|task| TaskId {
+ task_id: task.task_id as u32,
+ task_attempt_num: task.task_attempt as u32,
+ partition_id: task.partition.partition_id as u32,
+ })
+ .collect();
+ multi_tasks.push(MultiTaskDefinition {
+ task_ids,
+ job_id,
+ stage_id: stage_id as u32,
+ stage_attempt_num: stage_attempt_num as u32,
+ plan,
+ session_id,
+ launch_time,
+ props: vec![],
+ });
Ok(multi_tasks)
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]