This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new f073367d remove key-value stores for scheduler persistence (#1077)
f073367d is described below

commit f073367da57ff066c3fc8855c7382330ad5553d0
Author: Marko Milenković <[email protected]>
AuthorDate: Sat Oct 12 15:51:43 2024 +0100

    remove key-value stores for scheduler persistence (#1077)
    
    in-memory store only left, if HA required another
    strategy like check-pointing should be implemented
---
 .github/workflows/rust.yml                         |   2 -
 ballista/scheduler/Cargo.toml                      |   8 +-
 ballista/scheduler/scheduler_config_spec.toml      |  20 -
 ballista/scheduler/src/bin/main.rs                 |  18 +-
 ballista/scheduler/src/cluster/kv.rs               | 817 ---------------------
 ballista/scheduler/src/cluster/mod.rs              |  85 +--
 ballista/scheduler/src/cluster/storage/etcd.rs     | 346 ---------
 ballista/scheduler/src/cluster/storage/mod.rs      | 140 ----
 ballista/scheduler/src/cluster/storage/sled.rs     | 395 ----------
 ballista/scheduler/src/config.rs                   |   4 -
 ballista/scheduler/src/lib.rs                      |   1 -
 ballista/scheduler/src/scheduler_server/grpc.rs    |   2 +-
 ballista/scheduler/src/scheduler_server/mod.rs     |   2 +-
 ballista/scheduler/src/standalone.rs               |   9 +-
 ballista/scheduler/src/state/execution_graph.rs    | 173 +----
 .../src/state/execution_graph/execution_stage.rs   | 380 +---------
 16 files changed, 21 insertions(+), 2381 deletions(-)

diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index db1b7fc9..b1832b16 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -176,8 +176,6 @@ jobs:
           export ARROW_TEST_DATA=$(pwd)/testing/data
           export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
           cd ballista
-          # snmalloc requires cmake so build without default features
-          cargo test --no-default-features --features sled
           # Ensure also compiles in standalone mode
           cargo test --no-default-features --features standalone
         env:
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 596db6d1..04837849 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -34,11 +34,9 @@ name = "ballista-scheduler"
 path = "src/bin/main.rs"
 
 [features]
-default = ["etcd", "sled", "flight-sql"]
-etcd = ["etcd-client"]
+default = ["flight-sql"]
 flight-sql = []
 prometheus-metrics = ["prometheus", "once_cell"]
-sled = ["sled_package", "tokio-stream"]
 
 
 [dependencies]
@@ -54,7 +52,6 @@ configure_me = { workspace = true }
 dashmap = "5.4.0"
 datafusion = { workspace = true }
 datafusion-proto = { workspace = true }
-etcd-client = { version = "0.14", optional = true }
 flatbuffers = { version = "23.5.26" }
 futures = "0.3"
 graphviz-rust = "0.8.0"
@@ -70,9 +67,8 @@ prost = { workspace = true }
 prost-types = { workspace = true }
 rand = "0.8"
 serde = { version = "1", features = ["derive"] }
-sled_package = { package = "sled", version = "0.34", optional = true }
 tokio = { version = "1.0", features = ["full"] }
-tokio-stream = { version = "0.1", features = ["net"], optional = true }
+tokio-stream = { version = "0.1", features = ["net"] }
 tonic = { workspace = true }
 # tonic 0.12.2 depends on tower 0.4.7
 tower = { version = "0.4.7", default-features = false, features = ["make", 
"util"] }
diff --git a/ballista/scheduler/scheduler_config_spec.toml 
b/ballista/scheduler/scheduler_config_spec.toml
index 857212fe..804987d9 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -29,13 +29,6 @@ name = "advertise_flight_sql_endpoint"
 type = "String"
 doc = "Route for proxying flight results via scheduler. Should be of the form 
'IP:PORT'"
 
-[[param]]
-abbr = "b"
-name = "cluster_backend"
-type = "ballista_scheduler::cluster::ClusterStorage"
-doc = "The configuration backend for the scheduler cluster state, possible 
values: etcd, memory, sled. Default: sled"
-default = "ballista_scheduler::cluster::ClusterStorage::Sled"
-
 [[param]]
 abbr = "n"
 name = "namespace"
@@ -43,13 +36,6 @@ type = "String"
 doc = "Namespace for the ballista cluster that this executor will join. 
Default: ballista"
 default = "std::string::String::from(\"ballista\")"
 
-[[param]]
-abbr = "e"
-name = "etcd_urls"
-type = "String"
-doc = "etcd urls for use when discovery mode is `etcd`. Default: 
localhost:2379"
-default = "std::string::String::from(\"localhost:2379\")"
-
 [[param]]
 name = "bind_host"
 type = "String"
@@ -118,12 +104,6 @@ type = "String"
 doc = "plugin dir"
 default = "std::string::String::from(\"\")"
 
-[[param]]
-name = "sled_dir"
-type = "String"
-doc = "Sled dir: Opens a Db for saving schduler metadata at the specified 
path. This will create a new storage directory at the specified path if it does 
not already exist."
-default = "std::string::String::from(\"\")"
-
 [[param]]
 name = "log_dir"
 type = "String"
diff --git a/ballista/scheduler/src/bin/main.rs 
b/ballista/scheduler/src/bin/main.rs
index d2e2c9ce..7d8b4b1b 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -26,7 +26,6 @@ use crate::config::{Config, ResultExt};
 use ballista_core::config::LogRotationPolicy;
 use ballista_core::print_version;
 use ballista_scheduler::cluster::BallistaCluster;
-use ballista_scheduler::cluster::ClusterStorage;
 use ballista_scheduler::config::{
     ClusterStorageConfig, SchedulerConfig, TaskDistribution, 
TaskDistributionPolicy,
 };
@@ -116,22 +115,7 @@ async fn inner() -> Result<()> {
     let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
     let addr = addr.parse()?;
 
-    let cluster_storage_config = match opt.cluster_backend {
-        ClusterStorage::Memory => ClusterStorageConfig::Memory,
-        ClusterStorage::Etcd => ClusterStorageConfig::Etcd(
-            opt.etcd_urls
-                .split_whitespace()
-                .map(|s| s.to_string())
-                .collect(),
-        ),
-        ClusterStorage::Sled => {
-            if opt.sled_dir.is_empty() {
-                ClusterStorageConfig::Sled(None)
-            } else {
-                ClusterStorageConfig::Sled(Some(opt.sled_dir))
-            }
-        }
-    };
+    let cluster_storage_config = ClusterStorageConfig::Memory;
 
     let task_distribution = match opt.task_distribution {
         TaskDistribution::Bias => TaskDistributionPolicy::Bias,
diff --git a/ballista/scheduler/src/cluster/kv.rs 
b/ballista/scheduler/src/cluster/kv.rs
deleted file mode 100644
index 58e56d9e..00000000
--- a/ballista/scheduler/src/cluster/kv.rs
+++ /dev/null
@@ -1,817 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cluster::storage::{KeyValueStore, Keyspace, Lock, Operation, 
WatchEvent};
-use crate::cluster::{
-    bind_task_bias, bind_task_consistent_hash, bind_task_round_robin, 
get_scan_files,
-    is_skip_consistent_hash, BoundTask, ClusterState, ExecutorHeartbeatStream,
-    ExecutorSlot, JobState, JobStateEvent, JobStateEventStream, JobStatus,
-    TaskDistributionPolicy, TopologyNode,
-};
-use crate::scheduler_server::{timestamp_secs, SessionBuilder};
-use crate::state::execution_graph::ExecutionGraph;
-use crate::state::session_manager::create_datafusion_context;
-use crate::state::task_manager::JobInfoCache;
-use crate::state::{decode_into, decode_protobuf};
-use async_trait::async_trait;
-use ballista_core::config::BallistaConfig;
-use ballista_core::consistent_hash::node::Node;
-use ballista_core::error::{BallistaError, Result};
-use ballista_core::serde::protobuf::job_status::Status;
-use ballista_core::serde::protobuf::{
-    self, AvailableTaskSlots, ExecutorHeartbeat, ExecutorTaskSlots, FailedJob,
-    KeyValuePair, QueuedJob,
-};
-use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
-use ballista_core::serde::BallistaCodec;
-use dashmap::DashMap;
-use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::SessionContext;
-use datafusion_proto::logical_plan::AsLogicalPlan;
-use datafusion_proto::physical_plan::AsExecutionPlan;
-use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
-use futures::StreamExt;
-use itertools::Itertools;
-use log::{error, info, warn};
-use prost::Message;
-use std::collections::{HashMap, HashSet};
-use std::future::Future;
-use std::sync::Arc;
-
-/// State implementation based on underlying `KeyValueStore`
-pub struct KeyValueState<
-    S: KeyValueStore,
-    T: 'static + AsLogicalPlan = LogicalPlanNode,
-    U: 'static + AsExecutionPlan = PhysicalPlanNode,
-> {
-    /// Underlying `KeyValueStore`
-    store: S,
-    /// ExecutorMetadata cache, executor_id -> ExecutorMetadata
-    executors: Arc<DashMap<String, ExecutorMetadata>>,
-    /// ExecutorHeartbeat cache, executor_id -> ExecutorHeartbeat
-    executor_heartbeats: Arc<DashMap<String, ExecutorHeartbeat>>,
-    /// Codec used to serialize/deserialize execution plan
-    codec: BallistaCodec<T, U>,
-    /// Name of current scheduler. Should be `{host}:{port}`
-    #[allow(dead_code)]
-    scheduler: String,
-    /// In-memory store of queued jobs. Map from Job ID -> (Job Name, 
queued_at timestamp)
-    queued_jobs: DashMap<String, (String, u64)>,
-    //// `SessionBuilder` for constructing `SessionContext` from stored 
`BallistaConfig`
-    session_builder: SessionBuilder,
-}
-
-impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
-    KeyValueState<S, T, U>
-{
-    pub fn new(
-        scheduler: impl Into<String>,
-        store: S,
-        codec: BallistaCodec<T, U>,
-        session_builder: SessionBuilder,
-    ) -> Self {
-        Self {
-            store,
-            executors: Arc::new(DashMap::new()),
-            executor_heartbeats: Arc::new(DashMap::new()),
-            scheduler: scheduler.into(),
-            codec,
-            queued_jobs: DashMap::new(),
-            session_builder,
-        }
-    }
-
-    /// Initialize the set of active executor heartbeats from storage
-    async fn init_active_executor_heartbeats(&self) -> Result<()> {
-        let heartbeats = self.store.scan(Keyspace::Heartbeats, None).await?;
-
-        for (_, value) in heartbeats {
-            let data: ExecutorHeartbeat = decode_protobuf(&value)?;
-            if let Some(protobuf::ExecutorStatus {
-                status: Some(protobuf::executor_status::Status::Active(_)),
-            }) = &data.status
-            {
-                self.executor_heartbeats
-                    .insert(data.executor_id.clone(), data);
-            }
-        }
-
-        Ok(())
-    }
-
-    /// Return the stream of executor heartbeats observed by all schedulers in 
the cluster.
-    /// This can be aggregated to provide an eventually consistent view of all 
executors within the cluster
-    async fn executor_heartbeat_stream(&self) -> 
Result<ExecutorHeartbeatStream> {
-        let events = self
-            .store
-            .watch(Keyspace::Heartbeats, String::default())
-            .await?;
-
-        Ok(events
-            .filter_map(|event| {
-                futures::future::ready(match event {
-                    WatchEvent::Put(_, value) => {
-                        if let Ok(heartbeat) =
-                            decode_protobuf::<ExecutorHeartbeat>(&value)
-                        {
-                            Some(heartbeat)
-                        } else {
-                            None
-                        }
-                    }
-                    WatchEvent::Delete(_) => None,
-                })
-            })
-            .boxed())
-    }
-
-    /// Get the topology nodes of the cluster for consistent hashing
-    fn get_topology_nodes(
-        &self,
-        available_slots: &[AvailableTaskSlots],
-        executors: Option<HashSet<String>>,
-    ) -> HashMap<String, TopologyNode> {
-        let mut nodes: HashMap<String, TopologyNode> = HashMap::new();
-        for slots in available_slots {
-            if let Some(executors) = executors.as_ref() {
-                if !executors.contains(&slots.executor_id) {
-                    continue;
-                }
-            }
-            if let Some(executor) = self.executors.get(&slots.executor_id) {
-                let node = TopologyNode::new(
-                    &executor.host,
-                    executor.port,
-                    &slots.executor_id,
-                    self.executor_heartbeats
-                        .get(&executor.id)
-                        .map(|heartbeat| heartbeat.timestamp)
-                        .unwrap_or(0),
-                    slots.slots,
-                );
-                if let Some(existing_node) = nodes.get(node.name()) {
-                    if existing_node.last_seen_ts < node.last_seen_ts {
-                        nodes.insert(node.name().to_string(), node);
-                    }
-                } else {
-                    nodes.insert(node.name().to_string(), node);
-                }
-            }
-        }
-        nodes
-    }
-}
-
-#[async_trait]
-impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
-    ClusterState for KeyValueState<S, T, U>
-{
-    /// Initialize a background process that will listen for executor 
heartbeats and update the in-memory cache
-    /// of executor heartbeats
-    async fn init(&self) -> Result<()> {
-        self.init_active_executor_heartbeats().await?;
-
-        let mut heartbeat_stream = self.executor_heartbeat_stream().await?;
-
-        info!("Initializing heartbeat listener");
-
-        let heartbeats = self.executor_heartbeats.clone();
-        let executors = self.executors.clone();
-        tokio::task::spawn(async move {
-            while let Some(heartbeat) = heartbeat_stream.next().await {
-                let executor_id = heartbeat.executor_id.clone();
-
-                match heartbeat
-                    .status
-                    .as_ref()
-                    .and_then(|status| status.status.as_ref())
-                {
-                    Some(protobuf::executor_status::Status::Dead(_)) => {
-                        heartbeats.remove(&executor_id);
-                        executors.remove(&executor_id);
-                    }
-                    _ => {
-                        heartbeats.insert(executor_id, heartbeat);
-                    }
-                }
-            }
-        });
-
-        Ok(())
-    }
-
-    async fn bind_schedulable_tasks(
-        &self,
-        distribution: TaskDistributionPolicy,
-        active_jobs: Arc<HashMap<String, JobInfoCache>>,
-        executors: Option<HashSet<String>>,
-    ) -> Result<Vec<BoundTask>> {
-        let lock = self.store.lock(Keyspace::Slots, "global").await?;
-
-        with_lock(lock, async {
-            let resources = self.store.get(Keyspace::Slots, "all").await?;
-
-            let mut slots =
-                ExecutorTaskSlots::decode(resources.as_slice()).map_err(|err| {
-                    BallistaError::Internal(format!(
-                        "Unexpected value in executor slots state: {err:?}"
-                    ))
-                })?;
-
-            let available_slots: Vec<&mut AvailableTaskSlots> = slots
-                .task_slots
-                .iter_mut()
-                .filter_map(|data| {
-                    (data.slots > 0
-                        && executors
-                            .as_ref()
-                            .map(|executors| 
executors.contains(&data.executor_id))
-                            .unwrap_or(true))
-                    .then_some(data)
-                })
-                .collect();
-
-            let bound_tasks = match distribution {
-                TaskDistributionPolicy::Bias => {
-                    bind_task_bias(available_slots, active_jobs, |_| 
false).await
-                }
-                TaskDistributionPolicy::RoundRobin => {
-                    bind_task_round_robin(available_slots, active_jobs, |_| 
false).await
-                }
-                TaskDistributionPolicy::ConsistentHash {
-                    num_replicas,
-                    tolerance,
-                } => {
-                    let mut bound_tasks = bind_task_round_robin(
-                        available_slots,
-                        active_jobs.clone(),
-                        |stage_plan: Arc<dyn ExecutionPlan>| {
-                            if let Ok(scan_files) = get_scan_files(stage_plan) 
{
-                                // Should be opposite to consistent hash ones.
-                                !is_skip_consistent_hash(&scan_files)
-                            } else {
-                                false
-                            }
-                        },
-                    )
-                    .await;
-                    info!("{} tasks bound by round robin policy", 
bound_tasks.len());
-                    let (bound_tasks_consistent_hash, ch_topology) =
-                        bind_task_consistent_hash(
-                            self.get_topology_nodes(&slots.task_slots, 
executors),
-                            num_replicas,
-                            tolerance,
-                            active_jobs,
-                            |_, plan| get_scan_files(plan),
-                        )
-                        .await?;
-                    info!(
-                        "{} tasks bound by consistent hashing policy",
-                        bound_tasks_consistent_hash.len()
-                    );
-                    if !bound_tasks_consistent_hash.is_empty() {
-                        bound_tasks.extend(bound_tasks_consistent_hash);
-                        // Update the available slots
-                        let mut executor_data: HashMap<String, 
AvailableTaskSlots> =
-                            slots
-                                .task_slots
-                                .into_iter()
-                                .map(|slots| (slots.executor_id.clone(), 
slots))
-                                .collect();
-                        let ch_topology = ch_topology.unwrap();
-                        for node in ch_topology.nodes() {
-                            if let Some(data) = 
executor_data.get_mut(&node.id) {
-                                data.slots = node.available_slots;
-                            } else {
-                                error!("Fail to find executor data for {}", 
&node.id);
-                            }
-                        }
-                        slots.task_slots = 
executor_data.into_values().collect();
-                    }
-                    bound_tasks
-                }
-            };
-
-            if !bound_tasks.is_empty() {
-                self.store
-                    .put(Keyspace::Slots, "all".to_owned(), 
slots.encode_to_vec())
-                    .await?
-            }
-
-            Ok(bound_tasks)
-        })
-        .await
-    }
-
-    async fn unbind_tasks(&self, executor_slots: Vec<ExecutorSlot>) -> 
Result<()> {
-        let mut increments = HashMap::new();
-        for (executor_id, num_slots) in executor_slots {
-            let v = increments.entry(executor_id).or_insert_with(|| 0);
-            *v += num_slots;
-        }
-
-        let lock = self.store.lock(Keyspace::Slots, "all").await?;
-
-        with_lock(lock, async {
-            let resources = self.store.get(Keyspace::Slots, "all").await?;
-
-            let mut slots =
-                ExecutorTaskSlots::decode(resources.as_slice()).map_err(|err| {
-                    BallistaError::Internal(format!(
-                        "Unexpected value in executor slots state: {err:?}"
-                    ))
-                })?;
-
-            for executor_slots in slots.task_slots.iter_mut() {
-                if let Some(slots) = 
increments.get(&executor_slots.executor_id) {
-                    executor_slots.slots += *slots;
-                }
-            }
-
-            self.store
-                .put(Keyspace::Slots, "all".to_string(), slots.encode_to_vec())
-                .await
-        })
-        .await
-    }
-
-    async fn register_executor(
-        &self,
-        metadata: ExecutorMetadata,
-        spec: ExecutorData,
-    ) -> Result<()> {
-        let executor_id = metadata.id.clone();
-
-        //TODO this should be in a transaction
-        // Now that we know we can connect, save the metadata and slots
-        self.save_executor_metadata(metadata).await?;
-        self.save_executor_heartbeat(ExecutorHeartbeat {
-            executor_id: executor_id.clone(),
-            timestamp: timestamp_secs(),
-            metrics: vec![],
-            status: Some(protobuf::ExecutorStatus {
-                status: Some(
-                    
protobuf::executor_status::Status::Active(String::default()),
-                ),
-            }),
-        })
-        .await?;
-
-        let available_slots = AvailableTaskSlots {
-            executor_id,
-            slots: spec.available_task_slots,
-        };
-
-        let lock = self.store.lock(Keyspace::Slots, "all").await?;
-
-        with_lock(lock, async {
-            let current_slots = self.store.get(Keyspace::Slots, "all").await?;
-
-            let mut current_slots: ExecutorTaskSlots =
-                decode_protobuf(current_slots.as_slice())?;
-
-            if let Some((idx, _)) = current_slots
-                .task_slots
-                .iter()
-                .find_position(|slots| slots.executor_id == 
available_slots.executor_id)
-            {
-                current_slots.task_slots[idx] = available_slots;
-            } else {
-                current_slots.task_slots.push(available_slots);
-            }
-
-            self.store
-                .put(
-                    Keyspace::Slots,
-                    "all".to_string(),
-                    current_slots.encode_to_vec(),
-                )
-                .await
-        })
-        .await?;
-
-        Ok(())
-    }
-
-    async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> 
Result<()> {
-        let executor_id = metadata.id.clone();
-
-        let proto: protobuf::ExecutorMetadata = metadata.clone().into();
-        self.store
-            .put(
-                Keyspace::Executors,
-                executor_id.clone(),
-                proto.encode_to_vec(),
-            )
-            .await?;
-
-        self.executors.insert(executor_id, metadata);
-
-        Ok(())
-    }
-
-    async fn get_executor_metadata(&self, executor_id: &str) -> 
Result<ExecutorMetadata> {
-        let metadata = if let Some(metadata) = self.executors.get(executor_id) 
{
-            metadata.value().clone()
-        } else {
-            let value = self.store.get(Keyspace::Executors, 
executor_id).await?;
-            if value.is_empty() {
-                return Err(BallistaError::Internal(format!(
-                    "Executor {} not registered",
-                    executor_id
-                )));
-            }
-            let decoded =
-                decode_into::<protobuf::ExecutorMetadata, 
ExecutorMetadata>(&value)?;
-            self.executors
-                .insert(executor_id.to_string(), decoded.clone());
-
-            decoded
-        };
-
-        Ok(metadata)
-    }
-
-    async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> 
Result<()> {
-        let executor_id = heartbeat.executor_id.clone();
-        self.store
-            .put(
-                Keyspace::Heartbeats,
-                executor_id.clone(),
-                heartbeat.clone().encode_to_vec(),
-            )
-            .await?;
-        self.executor_heartbeats.insert(executor_id, heartbeat);
-        Ok(())
-    }
-
-    async fn remove_executor(&self, executor_id: &str) -> Result<()> {
-        let value = ExecutorHeartbeat {
-            executor_id: executor_id.to_owned(),
-            timestamp: timestamp_secs(),
-            metrics: vec![],
-            status: Some(protobuf::ExecutorStatus {
-                status: 
Some(protobuf::executor_status::Status::Dead("".to_string())),
-            }),
-        }
-        .encode_to_vec();
-
-        self.store
-            .put(Keyspace::Heartbeats, executor_id.to_owned(), value)
-            .await?;
-        self.executor_heartbeats.remove(executor_id);
-
-        // TODO Check the Executor reservation logic for push-based scheduling
-
-        Ok(())
-    }
-
-    fn executor_heartbeats(&self) -> HashMap<String, ExecutorHeartbeat> {
-        self.executor_heartbeats
-            .iter()
-            .map(|r| (r.key().clone(), r.value().clone()))
-            .collect()
-    }
-
-    fn get_executor_heartbeat(&self, executor_id: &str) -> 
Option<ExecutorHeartbeat> {
-        self.executor_heartbeats
-            .get(executor_id)
-            .map(|r| r.value().clone())
-    }
-}
-
-#[async_trait]
-impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> JobState
-    for KeyValueState<S, T, U>
-{
-    fn accept_job(&self, job_id: &str, job_name: &str, queued_at: u64) -> 
Result<()> {
-        self.queued_jobs
-            .insert(job_id.to_string(), (job_name.to_string(), queued_at));
-
-        Ok(())
-    }
-
-    fn pending_job_number(&self) -> usize {
-        self.queued_jobs.len()
-    }
-
-    async fn submit_job(&self, job_id: String, graph: &ExecutionGraph) -> 
Result<()> {
-        if self.queued_jobs.get(&job_id).is_some() {
-            let status = graph.status();
-            let encoded_graph =
-                ExecutionGraph::encode_execution_graph(graph.clone(), 
&self.codec)?;
-
-            self.store
-                .apply_txn(vec![
-                    (
-                        Operation::Put(status.encode_to_vec()),
-                        Keyspace::JobStatus,
-                        job_id.clone(),
-                    ),
-                    (
-                        Operation::Put(encoded_graph.encode_to_vec()),
-                        Keyspace::ExecutionGraph,
-                        job_id.clone(),
-                    ),
-                ])
-                .await?;
-
-            self.queued_jobs.remove(&job_id);
-
-            Ok(())
-        } else {
-            Err(BallistaError::Internal(format!(
-                "Failed to submit job {job_id}, job was not in queueud jobs"
-            )))
-        }
-    }
-
-    async fn get_jobs(&self) -> Result<HashSet<String>> {
-        self.store.scan_keys(Keyspace::JobStatus).await
-    }
-
-    async fn get_job_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
-        if let Some((job_name, queued_at)) = 
self.queued_jobs.get(job_id).as_deref() {
-            Ok(Some(JobStatus {
-                job_id: job_id.to_string(),
-                job_name: job_name.clone(),
-                status: Some(Status::Queued(QueuedJob {
-                    queued_at: *queued_at,
-                })),
-            }))
-        } else {
-            let value = self.store.get(Keyspace::JobStatus, job_id).await?;
-
-            (!value.is_empty())
-                .then(|| decode_protobuf(value.as_slice()))
-                .transpose()
-        }
-    }
-
-    async fn get_execution_graph(&self, job_id: &str) -> 
Result<Option<ExecutionGraph>> {
-        let value = self.store.get(Keyspace::ExecutionGraph, job_id).await?;
-
-        if value.is_empty() {
-            return Ok(None);
-        }
-
-        let proto: protobuf::ExecutionGraph = 
decode_protobuf(value.as_slice())?;
-
-        let session = self.get_session(&proto.session_id).await?;
-
-        Ok(Some(
-            ExecutionGraph::decode_execution_graph(proto, &self.codec, 
session.as_ref())
-                .await?,
-        ))
-    }
-
-    async fn save_job(&self, job_id: &str, graph: &ExecutionGraph) -> 
Result<()> {
-        let status = graph.status();
-        let encoded_graph =
-            ExecutionGraph::encode_execution_graph(graph.clone(), 
&self.codec)?;
-
-        self.store
-            .apply_txn(vec![
-                (
-                    Operation::Put(status.encode_to_vec()),
-                    Keyspace::JobStatus,
-                    job_id.to_string(),
-                ),
-                (
-                    Operation::Put(encoded_graph.encode_to_vec()),
-                    Keyspace::ExecutionGraph,
-                    job_id.to_string(),
-                ),
-            ])
-            .await
-    }
-
-    async fn fail_unscheduled_job(&self, job_id: &str, reason: String) -> 
Result<()> {
-        if let Some((job_id, (job_name, queued_at))) = 
self.queued_jobs.remove(job_id) {
-            let status = JobStatus {
-                job_id: job_id.clone(),
-                job_name,
-                status: Some(Status::Failed(FailedJob {
-                    error: reason,
-                    queued_at,
-                    started_at: 0,
-                    ended_at: 0,
-                })),
-            };
-
-            self.store
-                .put(Keyspace::JobStatus, job_id, status.encode_to_vec())
-                .await
-        } else {
-            Err(BallistaError::Internal(format!(
-                "Could not fail unscheduled job {job_id}, not found in queued 
jobs"
-            )))
-        }
-    }
-
-    async fn remove_job(&self, job_id: &str) -> Result<()> {
-        if self.queued_jobs.remove(job_id).is_none() {
-            self.store
-                .apply_txn(vec![
-                    (Operation::Delete, Keyspace::JobStatus, 
job_id.to_string()),
-                    (
-                        Operation::Delete,
-                        Keyspace::ExecutionGraph,
-                        job_id.to_string(),
-                    ),
-                ])
-                .await
-        } else {
-            Ok(())
-        }
-    }
-
-    async fn try_acquire_job(&self, _job_id: &str) -> 
Result<Option<ExecutionGraph>> {
-        Err(BallistaError::NotImplemented(
-            "Work stealing is not currently implemented".to_string(),
-        ))
-    }
-
-    async fn job_state_events(&self) -> Result<JobStateEventStream> {
-        let watch = self
-            .store
-            .watch(Keyspace::JobStatus, String::default())
-            .await?;
-
-        let stream = watch
-            .filter_map(|event| {
-                futures::future::ready(match event {
-                    WatchEvent::Put(key, value) => {
-                        if let Some(job_id) = 
Keyspace::JobStatus.strip_prefix(&key) {
-                            match JobStatus::decode(value.as_slice()) {
-                                Ok(status) => Some(JobStateEvent::JobUpdated {
-                                    job_id: job_id.to_string(),
-                                    status,
-                                }),
-                                Err(err) => {
-                                    warn!(
-                                    "Error decoding job status from watch 
event: {err:?}"
-                                );
-                                    None
-                                }
-                            }
-                        } else {
-                            None
-                        }
-                    }
-                    _ => None,
-                })
-            })
-            .boxed();
-
-        Ok(stream)
-    }
-
-    async fn get_session(&self, session_id: &str) -> 
Result<Arc<SessionContext>> {
-        let value = self.store.get(Keyspace::Sessions, session_id).await?;
-
-        let settings: protobuf::SessionSettings = decode_protobuf(&value)?;
-
-        let mut config_builder = BallistaConfig::builder();
-        for kv_pair in &settings.configs {
-            config_builder = config_builder.set(&kv_pair.key, &kv_pair.value);
-        }
-        let config = config_builder.build()?;
-
-        Ok(create_datafusion_context(&config, self.session_builder))
-    }
-
-    async fn create_session(
-        &self,
-        config: &BallistaConfig,
-    ) -> Result<Arc<SessionContext>> {
-        let mut settings: Vec<KeyValuePair> = vec![];
-
-        for (key, value) in config.settings() {
-            settings.push(KeyValuePair {
-                key: key.clone(),
-                value: value.clone(),
-            })
-        }
-
-        let value = protobuf::SessionSettings { configs: settings };
-
-        let session = create_datafusion_context(config, self.session_builder);
-
-        self.store
-            .put(
-                Keyspace::Sessions,
-                session.session_id(),
-                value.encode_to_vec(),
-            )
-            .await?;
-
-        Ok(session)
-    }
-
-    async fn update_session(
-        &self,
-        session_id: &str,
-        config: &BallistaConfig,
-    ) -> Result<Arc<SessionContext>> {
-        let mut settings: Vec<KeyValuePair> = vec![];
-
-        for (key, value) in config.settings() {
-            settings.push(KeyValuePair {
-                key: key.clone(),
-                value: value.clone(),
-            })
-        }
-
-        let value = protobuf::SessionSettings { configs: settings };
-        self.store
-            .put(
-                Keyspace::Sessions,
-                session_id.to_owned(),
-                value.encode_to_vec(),
-            )
-            .await?;
-
-        Ok(create_datafusion_context(config, self.session_builder))
-    }
-
-    async fn remove_session(
-        &self,
-        session_id: &str,
-    ) -> Result<Option<Arc<SessionContext>>> {
-        let session_ctx = self.get_session(session_id).await.ok();
-
-        self.store.delete(Keyspace::Sessions, session_id).await?;
-
-        Ok(session_ctx)
-    }
-}
-
-async fn with_lock<Out, F: Future<Output = Out>>(mut lock: Box<dyn Lock>, op: 
F) -> Out {
-    let result = op.await;
-    lock.unlock().await;
-    result
-}
-
-#[cfg(test)]
-mod test {
-
-    use crate::cluster::kv::KeyValueState;
-    use crate::cluster::storage::sled::SledClient;
-    use crate::cluster::test_util::{test_job_lifecycle, 
test_job_planning_failure};
-    use crate::test_utils::{
-        test_aggregation_plan, test_join_plan, test_two_aggregations_plan,
-    };
-    use ballista_core::error::Result;
-    use ballista_core::serde::BallistaCodec;
-    use ballista_core::utils::default_session_builder;
-
-    #[cfg(feature = "sled")]
-    #[tokio::test]
-    async fn test_sled_job_lifecycle() -> Result<()> {
-        test_job_lifecycle(make_sled_state()?, 
test_aggregation_plan(4).await).await?;
-        test_job_lifecycle(make_sled_state()?, 
test_two_aggregations_plan(4).await)
-            .await?;
-        test_job_lifecycle(make_sled_state()?, test_join_plan(4).await).await?;
-        Ok(())
-    }
-
-    #[cfg(feature = "sled")]
-    #[tokio::test]
-    async fn test_in_memory_job_planning_failure() -> Result<()> {
-        test_job_planning_failure(make_sled_state()?, 
test_aggregation_plan(4).await)
-            .await?;
-        test_job_planning_failure(
-            make_sled_state()?,
-            test_two_aggregations_plan(4).await,
-        )
-        .await?;
-        test_job_planning_failure(make_sled_state()?, 
test_join_plan(4).await).await?;
-
-        Ok(())
-    }
-
-    #[cfg(feature = "sled")]
-    fn make_sled_state() -> Result<KeyValueState<SledClient>> {
-        Ok(KeyValueState::new(
-            "",
-            SledClient::try_new_temporary()?,
-            BallistaCodec::default(),
-            default_session_builder,
-        ))
-    }
-}
diff --git a/ballista/scheduler/src/cluster/mod.rs 
b/ballista/scheduler/src/cluster/mod.rs
index b7489a25..5b9ecedf 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -28,36 +28,28 @@ use datafusion::datasource::physical_plan::{AvroExec, 
CsvExec, NdJsonExec, Parqu
 use datafusion::error::DataFusionError;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
-use datafusion_proto::logical_plan::AsLogicalPlan;
-use datafusion_proto::physical_plan::AsExecutionPlan;
 use futures::Stream;
 use log::{debug, info, warn};
 
 use ballista_core::config::BallistaConfig;
 use ballista_core::consistent_hash;
 use ballista_core::consistent_hash::ConsistentHash;
-use ballista_core::error::{BallistaError, Result};
+use ballista_core::error::Result;
 use ballista_core::serde::protobuf::{
     job_status, AvailableTaskSlots, ExecutorHeartbeat, JobStatus,
 };
 use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata, 
PartitionId};
-use ballista_core::serde::BallistaCodec;
 use ballista_core::utils::default_session_builder;
 
-use crate::cluster::kv::KeyValueState;
 use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
-use crate::cluster::storage::etcd::EtcdClient;
-use crate::cluster::storage::sled::SledClient;
-use crate::cluster::storage::KeyValueStore;
+
 use crate::config::{ClusterStorageConfig, SchedulerConfig, 
TaskDistributionPolicy};
 use crate::scheduler_server::SessionBuilder;
 use crate::state::execution_graph::{create_task_info, ExecutionGraph, 
TaskDescription};
 use crate::state::task_manager::JobInfoCache;
 
 pub mod event;
-pub mod kv;
 pub mod memory;
-pub mod storage;
 
 #[cfg(test)]
 #[allow(clippy::uninlined_format_args)]
@@ -67,9 +59,7 @@ pub mod test_util;
 // needs to be visible to code generated by configure_me
 #[derive(Debug, Clone, ValueEnum, serde::Deserialize, PartialEq, Eq)]
 pub enum ClusterStorage {
-    Etcd,
     Memory,
-    Sled,
 }
 
 impl std::str::FromStr for ClusterStorage {
@@ -113,81 +103,10 @@ impl BallistaCluster {
         }
     }
 
-    pub fn new_kv<
-        S: KeyValueStore,
-        T: 'static + AsLogicalPlan,
-        U: 'static + AsExecutionPlan,
-    >(
-        store: S,
-        scheduler: impl Into<String>,
-        session_builder: SessionBuilder,
-        codec: BallistaCodec<T, U>,
-    ) -> Self {
-        let kv_state =
-            Arc::new(KeyValueState::new(scheduler, store, codec, 
session_builder));
-        Self {
-            cluster_state: kv_state.clone(),
-            job_state: kv_state,
-        }
-    }
-
     pub async fn new_from_config(config: &SchedulerConfig) -> Result<Self> {
         let scheduler = config.scheduler_name();
 
         match &config.cluster_storage {
-            #[cfg(feature = "etcd")]
-            ClusterStorageConfig::Etcd(urls) => {
-                let etcd = etcd_client::Client::connect(urls.as_slice(), None)
-                    .await
-                    .map_err(|err| {
-                        BallistaError::Internal(format!(
-                            "Could not connect to etcd: {err:?}"
-                        ))
-                    })?;
-
-                Ok(Self::new_kv(
-                    EtcdClient::new(config.namespace.clone(), etcd),
-                    scheduler,
-                    default_session_builder,
-                    BallistaCodec::default(),
-                ))
-            }
-            #[cfg(not(feature = "etcd"))]
-            StateBackend::Etcd => {
-                unimplemented!(
-                    "build the scheduler with the `etcd` feature to use the 
etcd config backend"
-                )
-            }
-            #[cfg(feature = "sled")]
-            ClusterStorageConfig::Sled(dir) => {
-                if let Some(dir) = dir.as_ref() {
-                    info!("Initializing Sled database in directory {}", dir);
-                    let sled = SledClient::try_new(dir)?;
-
-                    Ok(Self::new_kv(
-                        sled,
-                        scheduler,
-                        default_session_builder,
-                        BallistaCodec::default(),
-                    ))
-                } else {
-                    info!("Initializing Sled database in temp directory");
-                    let sled = SledClient::try_new_temporary()?;
-
-                    Ok(Self::new_kv(
-                        sled,
-                        scheduler,
-                        default_session_builder,
-                        BallistaCodec::default(),
-                    ))
-                }
-            }
-            #[cfg(not(feature = "sled"))]
-            StateBackend::Sled => {
-                unimplemented!(
-                    "build the scheduler with the `sled` feature to use the 
sled config backend"
-                )
-            }
             ClusterStorageConfig::Memory => Ok(BallistaCluster::new_memory(
                 scheduler,
                 default_session_builder,
diff --git a/ballista/scheduler/src/cluster/storage/etcd.rs 
b/ballista/scheduler/src/cluster/storage/etcd.rs
deleted file mode 100644
index 514511c9..00000000
--- a/ballista/scheduler/src/cluster/storage/etcd.rs
+++ /dev/null
@@ -1,346 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::collections::HashSet;
-
-use std::task::Poll;
-
-use async_trait::async_trait;
-use ballista_core::error::{ballista_error, Result};
-use std::time::Instant;
-
-use crate::cluster::storage::KeyValueStore;
-use etcd_client::{
-    GetOptions, LockOptions, LockResponse, Txn, TxnOp, WatchOptions, 
WatchStream, Watcher,
-};
-use futures::{Stream, StreamExt};
-use log::{debug, error, warn};
-
-use crate::cluster::storage::{Keyspace, Lock, Operation, Watch, WatchEvent};
-
-/// A [`StateBackendClient`] implementation that uses etcd to save cluster 
state.
-#[derive(Clone)]
-pub struct EtcdClient {
-    namespace: String,
-    etcd: etcd_client::Client,
-}
-
-impl EtcdClient {
-    pub fn new(namespace: String, etcd: etcd_client::Client) -> Self {
-        Self { namespace, etcd }
-    }
-}
-
-#[async_trait]
-impl KeyValueStore for EtcdClient {
-    async fn get(&self, keyspace: Keyspace, key: &str) -> Result<Vec<u8>> {
-        let key = format!("/{}/{:?}/{}", self.namespace, keyspace, key);
-
-        Ok(self
-            .etcd
-            .clone()
-            .get(key, None)
-            .await
-            .map_err(|e| ballista_error(&format!("etcd error {e:?}")))?
-            .kvs()
-            .first()
-            .map(|kv| kv.value().to_owned())
-            .unwrap_or_default())
-    }
-
-    async fn get_from_prefix(
-        &self,
-        keyspace: Keyspace,
-        prefix: &str,
-    ) -> Result<Vec<(String, Vec<u8>)>> {
-        let prefix = format!("/{}/{:?}/{}", self.namespace, keyspace, prefix);
-
-        Ok(self
-            .etcd
-            .clone()
-            .get(prefix, Some(GetOptions::new().with_prefix()))
-            .await
-            .map_err(|e| ballista_error(&format!("etcd error {e:?}")))?
-            .kvs()
-            .iter()
-            .map(|kv| (kv.key_str().unwrap().to_owned(), 
kv.value().to_owned()))
-            .collect())
-    }
-
-    async fn scan(
-        &self,
-        keyspace: Keyspace,
-        limit: Option<usize>,
-    ) -> Result<Vec<(String, Vec<u8>)>> {
-        let prefix = format!("/{}/{:?}/", self.namespace, keyspace);
-
-        let options = if let Some(limit) = limit {
-            GetOptions::new().with_prefix().with_limit(limit as i64)
-        } else {
-            GetOptions::new().with_prefix()
-        };
-
-        Ok(self
-            .etcd
-            .clone()
-            .get(prefix, Some(options))
-            .await
-            .map_err(|e| ballista_error(&format!("etcd error {e:?}")))?
-            .kvs()
-            .iter()
-            .map(|kv| (kv.key_str().unwrap().to_owned(), 
kv.value().to_owned()))
-            .collect())
-    }
-
-    async fn scan_keys(&self, keyspace: Keyspace) -> Result<HashSet<String>> {
-        let prefix = format!("/{}/{:?}/", self.namespace, keyspace);
-
-        let options = GetOptions::new().with_prefix().with_keys_only();
-
-        Ok(self
-            .etcd
-            .clone()
-            .get(prefix.clone(), Some(options))
-            .await
-            .map_err(|e| ballista_error(&format!("etcd error {e:?}")))?
-            .kvs()
-            .iter()
-            .map(|kv| {
-                kv.key_str()
-                    .unwrap()
-                    .strip_prefix(&prefix)
-                    .unwrap()
-                    .to_owned()
-            })
-            .collect())
-    }
-
-    async fn put(&self, keyspace: Keyspace, key: String, value: Vec<u8>) -> 
Result<()> {
-        let key = format!("/{}/{:?}/{}", self.namespace, keyspace, key);
-
-        let mut etcd = self.etcd.clone();
-        etcd.put(key, value.clone(), None)
-            .await
-            .map_err(|e| {
-                warn!("etcd put failed: {}", e);
-                ballista_error(&format!("etcd put failed: {e}"))
-            })
-            .map(|_| ())
-    }
-
-    /// Apply multiple operations in a single transaction.
-    async fn apply_txn(&self, ops: Vec<(Operation, Keyspace, String)>) -> 
Result<()> {
-        let mut etcd = self.etcd.clone();
-
-        let txn_ops: Vec<TxnOp> = ops
-            .into_iter()
-            .map(|(operation, ks, key)| {
-                let key = format!("/{}/{:?}/{}", self.namespace, ks, key);
-                match operation {
-                    Operation::Put(value) => TxnOp::put(key, value, None),
-                    Operation::Delete => TxnOp::delete(key, None),
-                }
-            })
-            .collect();
-
-        etcd.txn(Txn::new().and_then(txn_ops))
-            .await
-            .map_err(|e| {
-                error!("etcd operation failed: {}", e);
-                ballista_error(&format!("etcd operation failed: {e}"))
-            })
-            .map(|_| ())
-    }
-
-    async fn mv(
-        &self,
-        from_keyspace: Keyspace,
-        to_keyspace: Keyspace,
-        key: &str,
-    ) -> Result<()> {
-        let mut etcd = self.etcd.clone();
-        let from_key = format!("/{}/{:?}/{}", self.namespace, from_keyspace, 
key);
-        let to_key = format!("/{}/{:?}/{}", self.namespace, to_keyspace, key);
-
-        let current_value = etcd
-            .get(from_key.as_str(), None)
-            .await
-            .map_err(|e| ballista_error(&format!("etcd error {e:?}")))?
-            .kvs()
-            .first()
-            .map(|kv| kv.value().to_owned());
-
-        if let Some(value) = current_value {
-            let txn = Txn::new().and_then(vec![
-                TxnOp::delete(from_key.as_str(), None),
-                TxnOp::put(to_key.as_str(), value, None),
-            ]);
-            etcd.txn(txn).await.map_err(|e| {
-                error!("etcd put failed: {}", e);
-                ballista_error("etcd move failed")
-            })?;
-        } else {
-            warn!("Cannot move value at {}, does not exist", from_key);
-        }
-
-        Ok(())
-    }
-
-    async fn lock(&self, keyspace: Keyspace, key: &str) -> Result<Box<dyn 
Lock>> {
-        let start = Instant::now();
-        let mut etcd = self.etcd.clone();
-
-        let lock_id = format!("/{}/mutex/{:?}/{}", self.namespace, keyspace, 
key);
-
-        // Create a lease which expires after 30 seconds. We then associate 
this lease with the lock
-        // acquired below. This protects against a scheduler dying 
unexpectedly while holding locks
-        // on shared resources. In that case, those locks would expire once 
the lease expires.
-        // TODO This is not great to do for every lock. We should have a 
single lease per scheduler instance
-        let lease_id = etcd
-            .lease_client()
-            .grant(30, None)
-            .await
-            .map_err(|e| {
-                warn!("etcd lease failed: {}", e);
-                ballista_error("etcd lease failed")
-            })?
-            .id();
-
-        let lock_options = LockOptions::new().with_lease(lease_id);
-
-        let lock = etcd
-            .lock(lock_id.as_str(), Some(lock_options))
-            .await
-            .map_err(|e| {
-                warn!("etcd lock failed: {}", e);
-                ballista_error("etcd lock failed")
-            })?;
-
-        let elapsed = start.elapsed();
-        debug!("Acquired lock {} in {:?}", lock_id, elapsed);
-        Ok(Box::new(EtcdLockGuard { etcd, lock }))
-    }
-
-    async fn watch(&self, keyspace: Keyspace, prefix: String) -> 
Result<Box<dyn Watch>> {
-        let prefix = format!("/{}/{:?}/{}", self.namespace, keyspace, prefix);
-
-        let mut etcd = self.etcd.clone();
-        let options = WatchOptions::new().with_prefix();
-        let (watcher, stream) = etcd.watch(prefix, 
Some(options)).await.map_err(|e| {
-            warn!("etcd watch failed: {}", e);
-            ballista_error("etcd watch failed")
-        })?;
-        Ok(Box::new(EtcdWatch {
-            watcher,
-            stream,
-            buffered_events: Vec::new(),
-        }))
-    }
-
-    async fn delete(&self, keyspace: Keyspace, key: &str) -> Result<()> {
-        let key = format!("/{}/{:?}/{}", self.namespace, keyspace, key);
-
-        let mut etcd = self.etcd.clone();
-
-        etcd.delete(key, None).await.map_err(|e| {
-            warn!("etcd delete failed: {:?}", e);
-            ballista_error("etcd delete failed")
-        })?;
-
-        Ok(())
-    }
-}
-
-struct EtcdWatch {
-    watcher: Watcher,
-    stream: WatchStream,
-    buffered_events: Vec<WatchEvent>,
-}
-
-#[tonic::async_trait]
-impl Watch for EtcdWatch {
-    async fn cancel(&mut self) -> Result<()> {
-        self.watcher.cancel().await.map_err(|e| {
-            warn!("etcd watch cancel failed: {}", e);
-            ballista_error("etcd watch cancel failed")
-        })
-    }
-}
-
-impl Stream for EtcdWatch {
-    type Item = WatchEvent;
-
-    fn poll_next(
-        self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        let self_mut = self.get_mut();
-        if let Some(event) = self_mut.buffered_events.pop() {
-            Poll::Ready(Some(event))
-        } else {
-            loop {
-                match self_mut.stream.poll_next_unpin(cx) {
-                    Poll::Ready(Some(Err(e))) => {
-                        warn!("Error when watching etcd prefix: {}", e);
-                        continue;
-                    }
-                    Poll::Ready(Some(Ok(v))) => {
-                        
self_mut.buffered_events.extend(v.events().iter().map(|ev| {
-                            match ev.event_type() {
-                                etcd_client::EventType::Put => {
-                                    let kv = ev.kv().unwrap();
-                                    WatchEvent::Put(
-                                        kv.key_str().unwrap().to_string(),
-                                        kv.value().to_owned(),
-                                    )
-                                }
-                                etcd_client::EventType::Delete => {
-                                    let kv = ev.kv().unwrap();
-                                    
WatchEvent::Delete(kv.key_str().unwrap().to_string())
-                                }
-                            }
-                        }));
-                        if let Some(event) = self_mut.buffered_events.pop() {
-                            return Poll::Ready(Some(event));
-                        } else {
-                            continue;
-                        }
-                    }
-                    Poll::Ready(None) => return Poll::Ready(None),
-                    Poll::Pending => return Poll::Pending,
-                }
-            }
-        }
-    }
-
-    fn size_hint(&self) -> (usize, Option<usize>) {
-        self.stream.size_hint()
-    }
-}
-
-struct EtcdLockGuard {
-    etcd: etcd_client::Client,
-    lock: LockResponse,
-}
-
-// Cannot use Drop because we need this to be async
-#[tonic::async_trait]
-impl Lock for EtcdLockGuard {
-    async fn unlock(&mut self) {
-        self.etcd.unlock(self.lock.key()).await.unwrap();
-    }
-}
diff --git a/ballista/scheduler/src/cluster/storage/mod.rs 
b/ballista/scheduler/src/cluster/storage/mod.rs
deleted file mode 100644
index 6aa049c0..00000000
--- a/ballista/scheduler/src/cluster/storage/mod.rs
+++ /dev/null
@@ -1,140 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#[cfg(feature = "etcd")]
-pub mod etcd;
-#[cfg(feature = "sled")]
-pub mod sled;
-
-use async_trait::async_trait;
-use ballista_core::error::Result;
-use futures::{future, Stream};
-use std::collections::HashSet;
-use tokio::sync::OwnedMutexGuard;
-
-#[derive(Debug, Clone, Eq, PartialEq, Hash)]
-pub enum Keyspace {
-    Executors,
-    JobStatus,
-    ExecutionGraph,
-    Slots,
-    Sessions,
-    Heartbeats,
-}
-
-impl Keyspace {
-    pub fn strip_prefix<'a>(&'a self, key: &'a str) -> Option<&'a str> {
-        key.strip_prefix(&format!("{self:?}/"))
-    }
-}
-
-#[derive(Debug, Eq, PartialEq, Hash)]
-pub enum Operation {
-    Put(Vec<u8>),
-    Delete,
-}
-
-/// A trait that defines a KeyValue interface with basic locking primitives 
for persisting Ballista cluster state
-#[async_trait]
-pub trait KeyValueStore: Send + Sync + Clone + 'static {
-    /// Retrieve the data associated with a specific key in a given keyspace.
-    ///
-    /// An empty vec is returned if the key does not exist.
-    async fn get(&self, keyspace: Keyspace, key: &str) -> Result<Vec<u8>>;
-
-    /// Retrieve all key/value pairs in given keyspace matching a given key 
prefix.
-    async fn get_from_prefix(
-        &self,
-        keyspace: Keyspace,
-        prefix: &str,
-    ) -> Result<Vec<(String, Vec<u8>)>>;
-
-    /// Retrieve all key/value pairs in a given keyspace. If a limit is 
specified, will return at
-    /// most `limit` key-value pairs.
-    async fn scan(
-        &self,
-        keyspace: Keyspace,
-        limit: Option<usize>,
-    ) -> Result<Vec<(String, Vec<u8>)>>;
-
-    /// Retrieve all keys from a given keyspace (without their values). The 
implementations
-    /// should handle stripping any prefixes it may add.
-    async fn scan_keys(&self, keyspace: Keyspace) -> Result<HashSet<String>>;
-
-    /// Saves the value into the provided key, overriding any previous data 
that might have been associated to that key.
-    async fn put(&self, keyspace: Keyspace, key: String, value: Vec<u8>) -> 
Result<()>;
-
-    /// Bundle multiple operation in a single transaction. Either all values 
should be saved, or all should fail.
-    /// It can support multiple types of operations and keyspaces. If the 
count of the unique keyspace is more than one,
-    /// more than one locks has to be acquired.
-    async fn apply_txn(&self, ops: Vec<(Operation, Keyspace, String)>) -> 
Result<()>;
-    /// Acquire mutex with specified IDs.
-    async fn acquire_locks(
-        &self,
-        mut ids: Vec<(Keyspace, &str)>,
-    ) -> Result<Vec<Box<dyn Lock>>> {
-        // We always acquire locks in a specific order to avoid deadlocks.
-        ids.sort_by_key(|n| format!("/{:?}/{}", n.0, n.1));
-        future::try_join_all(ids.into_iter().map(|(ks, key)| self.lock(ks, 
key))).await
-    }
-
-    /// Atomically move the given key from one keyspace to another
-    async fn mv(
-        &self,
-        from_keyspace: Keyspace,
-        to_keyspace: Keyspace,
-        key: &str,
-    ) -> Result<()>;
-
-    /// Acquire mutex with specified ID.
-    async fn lock(&self, keyspace: Keyspace, key: &str) -> Result<Box<dyn 
Lock>>;
-
-    /// Watch all events that happen on a specific prefix.
-    async fn watch(
-        &self,
-        keyspace: Keyspace,
-        prefix: String,
-    ) -> Result<Box<dyn Watch<Item = WatchEvent>>>;
-
-    /// Permanently delete a key from state
-    async fn delete(&self, keyspace: Keyspace, key: &str) -> Result<()>;
-}
-
-/// A Watch is a cancelable stream of put or delete events in the 
[StateBackendClient]
-#[async_trait]
-pub trait Watch: Stream<Item = WatchEvent> + Send + Unpin {
-    async fn cancel(&mut self) -> Result<()>;
-}
-
-#[derive(Clone, Debug, Eq, PartialEq)]
-pub enum WatchEvent {
-    /// Contains the inserted or updated key and the new value
-    Put(String, Vec<u8>),
-
-    /// Contains the deleted key
-    Delete(String),
-}
-
-#[async_trait]
-pub trait Lock: Send + Sync {
-    async fn unlock(&mut self);
-}
-
-#[async_trait]
-impl<T: Send + Sync> Lock for OwnedMutexGuard<T> {
-    async fn unlock(&mut self) {}
-}
diff --git a/ballista/scheduler/src/cluster/storage/sled.rs 
b/ballista/scheduler/src/cluster/storage/sled.rs
deleted file mode 100644
index 6be4c504..00000000
--- a/ballista/scheduler/src/cluster/storage/sled.rs
+++ /dev/null
@@ -1,395 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::collections::{HashMap, HashSet};
-use std::{sync::Arc, task::Poll};
-
-use ballista_core::error::{ballista_error, BallistaError, Result};
-
-use crate::cluster::storage::KeyValueStore;
-use async_trait::async_trait;
-use futures::{FutureExt, Stream};
-use log::warn;
-use sled_package as sled;
-use tokio::sync::Mutex;
-
-use crate::cluster::storage::{Keyspace, Lock, Operation, Watch, WatchEvent};
-
-/// A [`StateBackendClient`] implementation that uses file-based storage to 
save cluster state.
-#[derive(Clone)]
-pub struct SledClient {
-    db: sled::Db,
-    locks: Arc<Mutex<HashMap<String, Arc<Mutex<()>>>>>,
-}
-
-impl SledClient {
-    /// Creates a SledClient that saves data to the specified file.
-    pub fn try_new<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
-        Ok(Self {
-            db: sled::open(path).map_err(sled_to_ballista_error)?,
-            locks: Arc::new(Mutex::new(HashMap::new())),
-        })
-    }
-
-    /// Creates a SledClient that saves data to a temp file.
-    pub fn try_new_temporary() -> Result<Self> {
-        Ok(Self {
-            db: sled::Config::new()
-                .temporary(true)
-                .open()
-                .map_err(sled_to_ballista_error)?,
-            locks: Arc::new(Mutex::new(HashMap::new())),
-        })
-    }
-}
-
-fn sled_to_ballista_error(e: sled::Error) -> BallistaError {
-    match e {
-        sled::Error::Io(io) => BallistaError::IoError(io),
-        _ => BallistaError::General(format!("{e}")),
-    }
-}
-
-#[async_trait]
-impl KeyValueStore for SledClient {
-    async fn get(&self, keyspace: Keyspace, key: &str) -> Result<Vec<u8>> {
-        let key = format!("/{keyspace:?}/{key}");
-        Ok(self
-            .db
-            .get(key)
-            .map_err(|e| ballista_error(&format!("sled error {e:?}")))?
-            .map(|v| v.to_vec())
-            .unwrap_or_default())
-    }
-
-    async fn get_from_prefix(
-        &self,
-        keyspace: Keyspace,
-        prefix: &str,
-    ) -> Result<Vec<(String, Vec<u8>)>> {
-        let prefix = format!("/{keyspace:?}/{prefix}");
-        Ok(self
-            .db
-            .scan_prefix(prefix)
-            .map(|v| {
-                v.map(|(key, value)| {
-                    (
-                        std::str::from_utf8(&key).unwrap().to_owned(),
-                        value.to_vec(),
-                    )
-                })
-            })
-            .collect::<std::result::Result<Vec<_>, _>>()
-            .map_err(|e| ballista_error(&format!("sled error {e:?}")))?)
-    }
-
-    async fn scan(
-        &self,
-        keyspace: Keyspace,
-        limit: Option<usize>,
-    ) -> Result<Vec<(String, Vec<u8>)>> {
-        let prefix = format!("/{keyspace:?}/");
-        if let Some(limit) = limit {
-            Ok(self
-                .db
-                .scan_prefix(prefix)
-                .take(limit)
-                .map(|v| {
-                    v.map(|(key, value)| {
-                        (
-                            std::str::from_utf8(&key).unwrap().to_owned(),
-                            value.to_vec(),
-                        )
-                    })
-                })
-                .collect::<std::result::Result<Vec<_>, _>>()
-                .map_err(|e| ballista_error(&format!("sled error {e:?}")))?)
-        } else {
-            Ok(self
-                .db
-                .scan_prefix(prefix)
-                .map(|v| {
-                    v.map(|(key, value)| {
-                        (
-                            std::str::from_utf8(&key).unwrap().to_owned(),
-                            value.to_vec(),
-                        )
-                    })
-                })
-                .collect::<std::result::Result<Vec<_>, _>>()
-                .map_err(|e| ballista_error(&format!("sled error {e:?}")))?)
-        }
-    }
-
-    async fn scan_keys(&self, keyspace: Keyspace) -> Result<HashSet<String>> {
-        let prefix = format!("/{keyspace:?}/");
-        Ok(self
-            .db
-            .scan_prefix(prefix.clone())
-            .map(|v| {
-                v.map(|(key, _value)| {
-                    std::str::from_utf8(&key)
-                        .unwrap()
-                        .strip_prefix(&prefix)
-                        .unwrap()
-                        .to_owned()
-                })
-            })
-            .collect::<std::result::Result<HashSet<_>, _>>()
-            .map_err(|e| ballista_error(&format!("sled error {e:?}")))?)
-    }
-
-    async fn put(&self, keyspace: Keyspace, key: String, value: Vec<u8>) -> 
Result<()> {
-        let key = format!("/{keyspace:?}/{key}");
-        self.db
-            .insert(key, value)
-            .map_err(|e| {
-                warn!("sled insert failed: {}", e);
-                ballista_error("sled insert failed")
-            })
-            .map(|_| ())
-    }
-
-    async fn apply_txn(&self, ops: Vec<(Operation, Keyspace, String)>) -> 
Result<()> {
-        let mut batch = sled::Batch::default();
-
-        for (op, keyspace, key_str) in ops {
-            let key = format!("/{:?}/{}", &keyspace, key_str);
-            match op {
-                Operation::Put(value) => batch.insert(key.as_str(), value),
-                Operation::Delete => batch.remove(key.as_str()),
-            }
-        }
-
-        self.db.apply_batch(batch).map_err(|e| {
-            warn!("sled transaction insert failed: {}", e);
-            ballista_error("sled operations failed")
-        })
-    }
-
-    async fn mv(
-        &self,
-        from_keyspace: Keyspace,
-        to_keyspace: Keyspace,
-        key: &str,
-    ) -> Result<()> {
-        let from_key = format!("/{from_keyspace:?}/{key}");
-        let to_key = format!("/{to_keyspace:?}/{key}");
-
-        let current_value = self
-            .db
-            .get(from_key.as_str())
-            .map_err(|e| ballista_error(&format!("sled error {e:?}")))?
-            .map(|v| v.to_vec());
-
-        if let Some(value) = current_value {
-            let mut batch = sled::Batch::default();
-
-            batch.remove(from_key.as_str());
-            batch.insert(to_key.as_str(), value);
-
-            self.db.apply_batch(batch).map_err(|e| {
-                warn!("sled transaction insert failed: {}", e);
-                ballista_error("sled insert failed")
-            })
-        } else {
-            // TODO should this return an error?
-            warn!("Cannot move value at {}, does not exist", from_key);
-            Ok(())
-        }
-    }
-
-    async fn lock(&self, keyspace: Keyspace, key: &str) -> Result<Box<dyn 
Lock>> {
-        let mut mlock = self.locks.lock().await;
-        let lock_key = format!("/{keyspace:?}/{key}");
-        if let Some(lock) = mlock.get(&lock_key) {
-            Ok(Box::new(lock.clone().lock_owned().await))
-        } else {
-            let new_lock = Arc::new(Mutex::new(()));
-            mlock.insert(lock_key, new_lock.clone());
-            Ok(Box::new(new_lock.lock_owned().await))
-        }
-    }
-
-    async fn watch(&self, keyspace: Keyspace, prefix: String) -> 
Result<Box<dyn Watch>> {
-        let prefix = format!("/{keyspace:?}/{prefix}");
-
-        Ok(Box::new(SledWatch {
-            subscriber: self.db.watch_prefix(prefix),
-        }))
-    }
-
-    async fn delete(&self, keyspace: Keyspace, key: &str) -> Result<()> {
-        let key = format!("/{keyspace:?}/{key}");
-        self.db.remove(key).map_err(|e| {
-            warn!("sled delete failed: {:?}", e);
-            ballista_error("sled delete failed")
-        })?;
-        Ok(())
-    }
-}
-
-struct SledWatch {
-    subscriber: sled::Subscriber,
-}
-
-#[tonic::async_trait]
-impl Watch for SledWatch {
-    async fn cancel(&mut self) -> Result<()> {
-        Ok(())
-    }
-}
-
-impl Stream for SledWatch {
-    type Item = WatchEvent;
-
-    fn poll_next(
-        self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        match self.get_mut().subscriber.poll_unpin(cx) {
-            Poll::Pending => Poll::Pending,
-            Poll::Ready(None) => Poll::Ready(None),
-            Poll::Ready(Some(sled::Event::Insert { key, value })) => {
-                let key = std::str::from_utf8(&key).unwrap().to_owned();
-                Poll::Ready(Some(WatchEvent::Put(key, value.to_vec())))
-            }
-            Poll::Ready(Some(sled::Event::Remove { key })) => {
-                let key = std::str::from_utf8(&key).unwrap().to_owned();
-                Poll::Ready(Some(WatchEvent::Delete(key)))
-            }
-        }
-    }
-
-    fn size_hint(&self) -> (usize, Option<usize>) {
-        self.subscriber.size_hint()
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::{KeyValueStore, SledClient, Watch, WatchEvent};
-
-    use crate::cluster::storage::{Keyspace, Operation};
-
-    use futures::StreamExt;
-    use std::result::Result;
-
-    fn create_instance() -> Result<SledClient, Box<dyn std::error::Error>> {
-        Ok(SledClient::try_new_temporary()?)
-    }
-
-    #[tokio::test]
-    async fn put_read() -> Result<(), Box<dyn std::error::Error>> {
-        let client = create_instance()?;
-        let key = "key";
-        let value = "value".as_bytes();
-        client
-            .put(Keyspace::Slots, key.to_owned(), value.to_vec())
-            .await?;
-        assert_eq!(client.get(Keyspace::Slots, key).await?, value);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn multiple_operation() -> Result<(), Box<dyn std::error::Error>> {
-        let client = create_instance()?;
-        let key = "key".to_string();
-        let value = "value".as_bytes().to_vec();
-        {
-            let _locks = client
-                .acquire_locks(vec![(Keyspace::JobStatus, ""), 
(Keyspace::Slots, "")])
-                .await?;
-
-            let txn_ops = vec![
-                (Operation::Put(value.clone()), Keyspace::Slots, key.clone()),
-                (
-                    Operation::Put(value.clone()),
-                    Keyspace::JobStatus,
-                    key.clone(),
-                ),
-            ];
-            client.apply_txn(txn_ops).await?;
-        }
-
-        assert_eq!(client.get(Keyspace::Slots, key.as_str()).await?, value);
-        assert_eq!(client.get(Keyspace::JobStatus, key.as_str()).await?, 
value);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn read_empty() -> Result<(), Box<dyn std::error::Error>> {
-        let client = create_instance()?;
-        let key = "key";
-        let empty: &[u8] = &[];
-        assert_eq!(client.get(Keyspace::Slots, key).await?, empty);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn read_prefix() -> Result<(), Box<dyn std::error::Error>> {
-        let client = create_instance()?;
-        let key = "key";
-        let value = "value".as_bytes();
-        client
-            .put(Keyspace::Slots, format!("{key}/1"), value.to_vec())
-            .await?;
-        client
-            .put(Keyspace::Slots, format!("{key}/2"), value.to_vec())
-            .await?;
-        assert_eq!(
-            client.get_from_prefix(Keyspace::Slots, key).await?,
-            vec![
-                ("/Slots/key/1".to_owned(), value.to_vec()),
-                ("/Slots/key/2".to_owned(), value.to_vec())
-            ]
-        );
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn read_watch() -> Result<(), Box<dyn std::error::Error>> {
-        let client = create_instance()?;
-        let key = "key";
-        let value = "value".as_bytes();
-        let mut watch: Box<dyn Watch<Item = WatchEvent>> =
-            client.watch(Keyspace::Slots, key.to_owned()).await?;
-        client
-            .put(Keyspace::Slots, key.to_owned(), value.to_vec())
-            .await?;
-        assert_eq!(
-            watch.next().await,
-            Some(WatchEvent::Put(
-                format!("/{:?}/{}", Keyspace::Slots, key.to_owned()),
-                value.to_owned()
-            ))
-        );
-        let value2 = "value2".as_bytes();
-        client
-            .put(Keyspace::Slots, key.to_owned(), value2.to_vec())
-            .await?;
-        assert_eq!(
-            watch.next().await,
-            Some(WatchEvent::Put(
-                format!("/{:?}/{}", Keyspace::Slots, key.to_owned()),
-                value2.to_owned()
-            ))
-        );
-        watch.cancel().await?;
-        Ok(())
-    }
-}
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 82280911..ce542e51 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -180,10 +180,6 @@ impl SchedulerConfig {
 #[derive(Clone, Debug)]
 pub enum ClusterStorageConfig {
     Memory,
-    #[cfg(feature = "etcd")]
-    Etcd(Vec<String>),
-    #[cfg(feature = "sled")]
-    Sled(Option<String>),
 }
 
 /// Policy of distributing tasks to available executor slots
diff --git a/ballista/scheduler/src/lib.rs b/ballista/scheduler/src/lib.rs
index cd6f047b..946e475f 100644
--- a/ballista/scheduler/src/lib.rs
+++ b/ballista/scheduler/src/lib.rs
@@ -25,7 +25,6 @@ pub mod metrics;
 pub mod planner;
 pub mod scheduler_process;
 pub mod scheduler_server;
-#[cfg(feature = "sled")]
 pub mod standalone;
 pub mod state;
 
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 6992bf75..0d7d5e36 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -643,7 +643,7 @@ fn extract_connect_info<T>(request: &Request<T>) -> 
Option<ConnectInfo<SocketAdd
         .cloned()
 }
 
-#[cfg(all(test, feature = "sled"))]
+#[cfg(test)]
 mod test {
     use std::sync::Arc;
     use std::time::Duration;
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index c2bf657b..e6e6739e 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -337,7 +337,7 @@ pub fn timestamp_millis() -> u64 {
         .as_millis() as u64
 }
 
-#[cfg(all(test, feature = "sled"))]
+#[cfg(test)]
 mod test {
     use std::sync::Arc;
 
diff --git a/ballista/scheduler/src/standalone.rs 
b/ballista/scheduler/src/standalone.rs
index cb74b2bf..bb6d7006 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -18,7 +18,7 @@
 use crate::cluster::BallistaCluster;
 use crate::config::SchedulerConfig;
 use crate::metrics::default_metrics_collector;
-use crate::{cluster::storage::sled::SledClient, 
scheduler_server::SchedulerServer};
+use crate::scheduler_server::SchedulerServer;
 use ballista_core::serde::BallistaCodec;
 use ballista_core::utils::{create_grpc_server, default_session_builder};
 use ballista_core::{
@@ -35,12 +35,7 @@ use tokio::net::TcpListener;
 pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
     let metrics_collector = default_metrics_collector()?;
 
-    let cluster = BallistaCluster::new_kv(
-        SledClient::try_new_temporary()?,
-        "localhost:50050",
-        default_session_builder,
-        BallistaCodec::default(),
-    );
+    let cluster = BallistaCluster::new_memory("localhost:50050", 
default_session_builder);
 
     let mut scheduler_server: SchedulerServer<LogicalPlanNode, 
PhysicalPlanNode> =
         SchedulerServer::new(
diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index 333545d3..e72e3906 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -24,25 +24,20 @@ use std::time::{SystemTime, UNIX_EPOCH};
 
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::{accept, ExecutionPlan, ExecutionPlanVisitor};
-use datafusion::prelude::SessionContext;
-use datafusion_proto::logical_plan::AsLogicalPlan;
 use log::{error, info, warn};
 
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
 use ballista_core::serde::protobuf::failed_task::FailedReason;
 use ballista_core::serde::protobuf::job_status::Status;
-use ballista_core::serde::protobuf::{
-    self, execution_graph_stage::StageType, FailedTask, JobStatus, ResultLost,
-    RunningJob, SuccessfulJob, TaskStatus,
-};
 use ballista_core::serde::protobuf::{job_status, FailedJob, 
ShuffleWritePartition};
 use ballista_core::serde::protobuf::{task_status, RunningTask};
+use ballista_core::serde::protobuf::{
+    FailedTask, JobStatus, ResultLost, RunningJob, SuccessfulJob, TaskStatus,
+};
 use ballista_core::serde::scheduler::{
     ExecutorMetadata, PartitionId, PartitionLocation, PartitionStats,
 };
-use ballista_core::serde::BallistaCodec;
-use datafusion_proto::physical_plan::AsExecutionPlan;
 
 use crate::display::print_stage_metrics;
 use crate::planner::DistributedPlanner;
@@ -50,8 +45,7 @@ use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::scheduler_server::timestamp_millis;
 use crate::state::execution_graph::execution_stage::RunningStage;
 pub(crate) use crate::state::execution_graph::execution_stage::{
-    ExecutionStage, FailedStage, ResolvedStage, StageOutput, SuccessfulStage, 
TaskInfo,
-    UnresolvedStage,
+    ExecutionStage, ResolvedStage, StageOutput, TaskInfo, UnresolvedStage,
 };
 use crate::state::task_manager::UpdatedStages;
 
@@ -103,6 +97,7 @@ mod execution_stage;
 #[derive(Clone)]
 pub struct ExecutionGraph {
     /// Curator scheduler name. Can be `None` is `ExecutionGraph` is not 
currently curated by any scheduler
+    #[allow(dead_code)] // not used at the moment, will be used later
     scheduler_id: Option<String>,
     /// ID for this job
     job_id: String,
@@ -121,6 +116,7 @@ pub struct ExecutionGraph {
     /// Map from Stage ID -> ExecutionStage
     stages: HashMap<usize, ExecutionStage>,
     /// Total number fo output partitions
+    #[allow(dead_code)] // not used at the moment, will be used later
     output_partitions: usize,
     /// Locations of this `ExecutionGraph` final output locations
     output_locations: Vec<PartitionLocation>,
@@ -1317,163 +1313,6 @@ impl ExecutionGraph {
     fn clear_stage_failure(&mut self, stage_id: usize) {
         self.failed_stage_attempts.remove(&stage_id);
     }
-
-    pub(crate) async fn decode_execution_graph<
-        T: 'static + AsLogicalPlan,
-        U: 'static + AsExecutionPlan,
-    >(
-        proto: protobuf::ExecutionGraph,
-        codec: &BallistaCodec<T, U>,
-        session_ctx: &SessionContext,
-    ) -> Result<ExecutionGraph> {
-        let mut stages: HashMap<usize, ExecutionStage> = HashMap::new();
-        for graph_stage in proto.stages {
-            let stage_type = graph_stage.stage_type.expect("Unexpected empty 
stage");
-
-            let execution_stage = match stage_type {
-                StageType::UnresolvedStage(stage) => {
-                    let stage: UnresolvedStage =
-                        UnresolvedStage::decode(stage, codec, session_ctx)?;
-                    (stage.stage_id, ExecutionStage::UnResolved(stage))
-                }
-                StageType::ResolvedStage(stage) => {
-                    let stage: ResolvedStage =
-                        ResolvedStage::decode(stage, codec, session_ctx)?;
-                    (stage.stage_id, ExecutionStage::Resolved(stage))
-                }
-                StageType::SuccessfulStage(stage) => {
-                    let stage: SuccessfulStage =
-                        SuccessfulStage::decode(stage, codec, session_ctx)?;
-                    (stage.stage_id, ExecutionStage::Successful(stage))
-                }
-                StageType::FailedStage(stage) => {
-                    let stage: FailedStage =
-                        FailedStage::decode(stage, codec, session_ctx)?;
-                    (stage.stage_id, ExecutionStage::Failed(stage))
-                }
-            };
-
-            stages.insert(execution_stage.0, execution_stage.1);
-        }
-
-        let output_locations: Vec<PartitionLocation> = proto
-            .output_locations
-            .into_iter()
-            .map(|loc| loc.try_into())
-            .collect::<Result<Vec<_>>>()?;
-
-        let failed_stage_attempts = proto
-            .failed_attempts
-            .into_iter()
-            .map(|attempt| {
-                (
-                    attempt.stage_id as usize,
-                    HashSet::from_iter(
-                        attempt
-                            .stage_attempt_num
-                            .into_iter()
-                            .map(|num| num as usize),
-                    ),
-                )
-            })
-            .collect();
-
-        Ok(ExecutionGraph {
-            scheduler_id: 
(!proto.scheduler_id.is_empty()).then_some(proto.scheduler_id),
-            job_id: proto.job_id,
-            job_name: proto.job_name,
-            session_id: proto.session_id,
-            status: proto.status.ok_or_else(|| {
-                BallistaError::Internal(
-                    "Invalid Execution Graph: missing job status".to_owned(),
-                )
-            })?,
-            queued_at: proto.queued_at,
-            start_time: proto.start_time,
-            end_time: proto.end_time,
-            stages,
-            output_partitions: proto.output_partitions as usize,
-            output_locations,
-            task_id_gen: proto.task_id_gen as usize,
-            failed_stage_attempts,
-        })
-    }
-
-    /// Running stages will not be persisted so that will not be encoded.
-    /// Running stages will be convert back to the resolved stages to be 
encoded and persisted
-    pub(crate) fn encode_execution_graph<
-        T: 'static + AsLogicalPlan,
-        U: 'static + AsExecutionPlan,
-    >(
-        graph: ExecutionGraph,
-        codec: &BallistaCodec<T, U>,
-    ) -> Result<protobuf::ExecutionGraph> {
-        let job_id = graph.job_id().to_owned();
-
-        let stages = graph
-            .stages
-            .into_values()
-            .map(|stage| {
-                let stage_type = match stage {
-                    ExecutionStage::UnResolved(stage) => {
-                        
StageType::UnresolvedStage(UnresolvedStage::encode(stage, codec)?)
-                    }
-                    ExecutionStage::Resolved(stage) => {
-                        StageType::ResolvedStage(ResolvedStage::encode(stage, 
codec)?)
-                    }
-                    ExecutionStage::Running(stage) => StageType::ResolvedStage(
-                        ResolvedStage::encode(stage.to_resolved(), codec)?,
-                    ),
-                    ExecutionStage::Successful(stage) => 
StageType::SuccessfulStage(
-                        SuccessfulStage::encode(job_id.clone(), stage, codec)?,
-                    ),
-                    ExecutionStage::Failed(stage) => StageType::FailedStage(
-                        FailedStage::encode(job_id.clone(), stage, codec)?,
-                    ),
-                };
-                Ok(protobuf::ExecutionGraphStage {
-                    stage_type: Some(stage_type),
-                })
-            })
-            .collect::<Result<Vec<_>>>()?;
-
-        let output_locations: Vec<protobuf::PartitionLocation> = graph
-            .output_locations
-            .into_iter()
-            .map(|loc| loc.try_into())
-            .collect::<Result<Vec<_>>>()?;
-
-        let failed_attempts: Vec<protobuf::StageAttempts> = graph
-            .failed_stage_attempts
-            .into_iter()
-            .map(|(stage_id, attempts)| {
-                let stage_attempt_num = attempts
-                    .into_iter()
-                    .map(|num| num as u32)
-                    .collect::<Vec<_>>();
-                protobuf::StageAttempts {
-                    stage_id: stage_id as u32,
-                    stage_attempt_num,
-                }
-            })
-            .collect::<Vec<_>>();
-
-        Ok(protobuf::ExecutionGraph {
-            job_id: graph.job_id,
-            job_name: graph.job_name,
-            session_id: graph.session_id,
-            status: Some(graph.status),
-            queued_at: graph.queued_at,
-            start_time: graph.start_time,
-            end_time: graph.end_time,
-            stages,
-            output_partitions: graph.output_partitions as u64,
-            output_locations,
-            scheduler_id: graph.scheduler_id.unwrap_or_default(),
-            task_id_gen: graph.task_id_gen as u32,
-            failed_attempts,
-        })
-    }
 }
 
 impl Debug for ExecutionGraph {
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs 
b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index 65a2d012..d919167b 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -18,7 +18,6 @@
 use std::collections::{HashMap, HashSet};
 use std::convert::TryInto;
 use std::fmt::{Debug, Formatter};
-use std::iter::FromIterator;
 use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 
@@ -27,21 +26,17 @@ use datafusion::physical_optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
 use datafusion::physical_plan::{ExecutionPlan, Metric};
-use datafusion::prelude::{SessionConfig, SessionContext};
-use datafusion_proto::logical_plan::AsLogicalPlan;
+use datafusion::prelude::SessionConfig;
 use log::{debug, warn};
 
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::execution_plans::ShuffleWriterExec;
 use ballista_core::serde::protobuf::failed_task::FailedReason;
+use ballista_core::serde::protobuf::{task_status, RunningTask};
 use ballista_core::serde::protobuf::{
-    self, task_info, FailedTask, GraphStageInput, OperatorMetricsSet, 
ResultLost,
-    SuccessfulTask, TaskStatus,
+    FailedTask, OperatorMetricsSet, ResultLost, SuccessfulTask, TaskStatus,
 };
-use ballista_core::serde::protobuf::{task_status, RunningTask};
 use ballista_core::serde::scheduler::PartitionLocation;
-use ballista_core::serde::BallistaCodec;
-use datafusion_proto::physical_plan::AsExecutionPlan;
 
 use crate::display::DisplayableBallistaExecutionPlan;
 
@@ -207,6 +202,7 @@ pub(crate) struct FailedStage {
     pub(crate) partitions: usize,
     /// Stage ID of the stage that will take this stages outputs as inputs.
     /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    #[allow(dead_code)] // not used at the moment, will be used later
     pub(crate) output_links: Vec<usize>,
     /// `ExecutionPlan` for this stage
     pub(crate) plan: Arc<dyn ExecutionPlan>,
@@ -214,12 +210,14 @@ pub(crate) struct FailedStage {
     /// The index of the Vec is the task's partition id
     pub(crate) task_infos: Vec<Option<TaskInfo>>,
     /// Combined metrics of the already finished tasks in the stage, If it is 
None, no task is finished yet.
+    #[allow(dead_code)] // not used at the moment, will be used later
     pub(crate) stage_metrics: Option<Vec<MetricsSet>>,
     /// Error message
     pub(crate) error_message: String,
 }
 
 #[derive(Clone)]
+#[allow(dead_code)] // we may use the fields later
 pub(crate) struct TaskInfo {
     /// Task ID
     pub(super) task_id: usize,
@@ -368,54 +366,6 @@ impl UnresolvedStage {
             self.last_attempt_failure_reasons.clone(),
         ))
     }
-
-    pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
-        stage: protobuf::UnResolvedStage,
-        codec: &BallistaCodec<T, U>,
-        session_ctx: &SessionContext,
-    ) -> Result<UnresolvedStage> {
-        let plan_proto = U::try_decode(&stage.plan)?;
-        let plan = plan_proto.try_into_physical_plan(
-            session_ctx,
-            session_ctx.runtime_env().as_ref(),
-            codec.physical_extension_codec(),
-        )?;
-
-        let inputs = decode_inputs(stage.inputs)?;
-
-        Ok(UnresolvedStage {
-            stage_id: stage.stage_id as usize,
-            stage_attempt_num: stage.stage_attempt_num as usize,
-            output_links: stage.output_links.into_iter().map(|l| l as 
usize).collect(),
-            plan,
-            inputs,
-            last_attempt_failure_reasons: HashSet::from_iter(
-                stage.last_attempt_failure_reasons,
-            ),
-        })
-    }
-
-    pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
-        stage: UnresolvedStage,
-        codec: &BallistaCodec<T, U>,
-    ) -> Result<protobuf::UnResolvedStage> {
-        let mut plan: Vec<u8> = vec![];
-        U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
-            .and_then(|proto| proto.try_encode(&mut plan))?;
-
-        let inputs = encode_inputs(stage.inputs)?;
-
-        Ok(protobuf::UnResolvedStage {
-            stage_id: stage.stage_id as u32,
-            stage_attempt_num: stage.stage_attempt_num as u32,
-            output_links: stage.output_links.into_iter().map(|l| l as 
u32).collect(),
-            inputs,
-            plan,
-            last_attempt_failure_reasons: Vec::from_iter(
-                stage.last_attempt_failure_reasons,
-            ),
-        })
-    }
 }
 
 impl Debug for UnresolvedStage {
@@ -482,56 +432,6 @@ impl ResolvedStage {
         );
         Ok(unresolved)
     }
-
-    pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
-        stage: protobuf::ResolvedStage,
-        codec: &BallistaCodec<T, U>,
-        session_ctx: &SessionContext,
-    ) -> Result<ResolvedStage> {
-        let plan_proto = U::try_decode(&stage.plan)?;
-        let plan = plan_proto.try_into_physical_plan(
-            session_ctx,
-            session_ctx.runtime_env().as_ref(),
-            codec.physical_extension_codec(),
-        )?;
-
-        let inputs = decode_inputs(stage.inputs)?;
-
-        Ok(ResolvedStage {
-            stage_id: stage.stage_id as usize,
-            stage_attempt_num: stage.stage_attempt_num as usize,
-            partitions: stage.partitions as usize,
-            output_links: stage.output_links.into_iter().map(|l| l as 
usize).collect(),
-            inputs,
-            plan,
-            last_attempt_failure_reasons: HashSet::from_iter(
-                stage.last_attempt_failure_reasons,
-            ),
-        })
-    }
-
-    pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
-        stage: ResolvedStage,
-        codec: &BallistaCodec<T, U>,
-    ) -> Result<protobuf::ResolvedStage> {
-        let mut plan: Vec<u8> = vec![];
-        U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
-            .and_then(|proto| proto.try_encode(&mut plan))?;
-
-        let inputs = encode_inputs(stage.inputs)?;
-
-        Ok(protobuf::ResolvedStage {
-            stage_id: stage.stage_id as u32,
-            stage_attempt_num: stage.stage_attempt_num as u32,
-            partitions: stage.partitions as u32,
-            output_links: stage.output_links.into_iter().map(|l| l as 
u32).collect(),
-            inputs,
-            plan,
-            last_attempt_failure_reasons: Vec::from_iter(
-                stage.last_attempt_failure_reasons,
-            ),
-        })
-    }
 }
 
 impl Debug for ResolvedStage {
@@ -611,18 +511,6 @@ impl RunningStage {
         }
     }
 
-    /// Change to the resolved state and bump the stage attempt number
-    pub(super) fn to_resolved(&self) -> ResolvedStage {
-        ResolvedStage::new(
-            self.stage_id,
-            self.stage_attempt_num + 1,
-            self.plan.clone(),
-            self.output_links.clone(),
-            self.inputs.clone(),
-            HashSet::new(),
-        )
-    }
-
     /// Change to the unresolved state and bump the stage attempt number
     pub(super) fn to_unresolved(
         &self,
@@ -952,80 +840,6 @@ impl SuccessfulStage {
         }
         reset
     }
-
-    pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
-        stage: protobuf::SuccessfulStage,
-        codec: &BallistaCodec<T, U>,
-        session_ctx: &SessionContext,
-    ) -> Result<SuccessfulStage> {
-        let plan_proto = U::try_decode(&stage.plan)?;
-        let plan = plan_proto.try_into_physical_plan(
-            session_ctx,
-            session_ctx.runtime_env().as_ref(),
-            codec.physical_extension_codec(),
-        )?;
-
-        let inputs = decode_inputs(stage.inputs)?;
-        assert_eq!(
-            stage.task_infos.len(),
-            stage.partitions as usize,
-            "protobuf::SuccessfulStage task_infos len not equal to partitions."
-        );
-        let task_infos = 
stage.task_infos.into_iter().map(decode_taskinfo).collect();
-        let stage_metrics = stage
-            .stage_metrics
-            .into_iter()
-            .map(|m| m.try_into())
-            .collect::<Result<Vec<_>>>()?;
-
-        Ok(SuccessfulStage {
-            stage_id: stage.stage_id as usize,
-            stage_attempt_num: stage.stage_attempt_num as usize,
-            partitions: stage.partitions as usize,
-            output_links: stage.output_links.into_iter().map(|l| l as 
usize).collect(),
-            inputs,
-            plan,
-            task_infos,
-            stage_metrics,
-        })
-    }
-
-    pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
-        _job_id: String,
-        stage: SuccessfulStage,
-        codec: &BallistaCodec<T, U>,
-    ) -> Result<protobuf::SuccessfulStage> {
-        let stage_id = stage.stage_id;
-
-        let mut plan: Vec<u8> = vec![];
-        U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
-            .and_then(|proto| proto.try_encode(&mut plan))?;
-
-        let inputs = encode_inputs(stage.inputs)?;
-        let task_infos = stage
-            .task_infos
-            .into_iter()
-            .enumerate()
-            .map(|(partition, task_info)| encode_taskinfo(task_info, 
partition))
-            .collect();
-
-        let stage_metrics = stage
-            .stage_metrics
-            .into_iter()
-            .map(|m| m.try_into())
-            .collect::<Result<Vec<_>>>()?;
-
-        Ok(protobuf::SuccessfulStage {
-            stage_id: stage_id as u32,
-            stage_attempt_num: stage.stage_attempt_num as u32,
-            partitions: stage.partitions as u32,
-            output_links: stage.output_links.into_iter().map(|l| l as 
u32).collect(),
-            inputs,
-            plan,
-            task_infos,
-            stage_metrics,
-        })
-    }
 }
 
 impl Debug for SuccessfulStage {
@@ -1071,85 +885,6 @@ impl FailedStage {
     pub(super) fn available_tasks(&self) -> usize {
         self.task_infos.iter().filter(|s| s.is_none()).count()
     }
-
-    pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
-        stage: protobuf::FailedStage,
-        codec: &BallistaCodec<T, U>,
-        session_ctx: &SessionContext,
-    ) -> Result<FailedStage> {
-        let plan_proto = U::try_decode(&stage.plan)?;
-        let plan = plan_proto.try_into_physical_plan(
-            session_ctx,
-            session_ctx.runtime_env().as_ref(),
-            codec.physical_extension_codec(),
-        )?;
-
-        let mut task_infos: Vec<Option<TaskInfo>> = vec![None; 
stage.partitions as usize];
-        for info in stage.task_infos {
-            task_infos[info.partition_id as usize] = 
Some(decode_taskinfo(info.clone()));
-        }
-
-        let stage_metrics = if stage.stage_metrics.is_empty() {
-            None
-        } else {
-            let ms = stage
-                .stage_metrics
-                .into_iter()
-                .map(|m| m.try_into())
-                .collect::<Result<Vec<_>>>()?;
-            Some(ms)
-        };
-
-        Ok(FailedStage {
-            stage_id: stage.stage_id as usize,
-            stage_attempt_num: stage.stage_attempt_num as usize,
-            partitions: stage.partitions as usize,
-            output_links: stage.output_links.into_iter().map(|l| l as 
usize).collect(),
-            plan,
-            task_infos,
-            stage_metrics,
-            error_message: stage.error_message,
-        })
-    }
-
-    pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
-        _job_id: String,
-        stage: FailedStage,
-        codec: &BallistaCodec<T, U>,
-    ) -> Result<protobuf::FailedStage> {
-        let stage_id = stage.stage_id;
-
-        let mut plan: Vec<u8> = vec![];
-        U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
-            .and_then(|proto| proto.try_encode(&mut plan))?;
-
-        let task_infos: Vec<protobuf::TaskInfo> = stage
-            .task_infos
-            .into_iter()
-            .enumerate()
-            .filter_map(|(partition, task_info)| {
-                task_info.map(|info| encode_taskinfo(info, partition))
-            })
-            .collect();
-
-        let stage_metrics = stage
-            .stage_metrics
-            .unwrap_or_default()
-            .into_iter()
-            .map(|m| m.try_into())
-            .collect::<Result<Vec<_>>>()?;
-
-        Ok(protobuf::FailedStage {
-            stage_id: stage_id as u32,
-            stage_attempt_num: stage.stage_attempt_num as u32,
-            partitions: stage.partitions as u32,
-            output_links: stage.output_links.into_iter().map(|l| l as 
u32).collect(),
-            plan,
-            task_infos,
-            stage_metrics,
-            error_message: stage.error_message,
-        })
-    }
 }
 
 impl Debug for FailedStage {
@@ -1220,106 +955,3 @@ impl StageOutput {
         self.complete
     }
 }
-
-fn decode_inputs(
-    stage_inputs: Vec<GraphStageInput>,
-) -> Result<HashMap<usize, StageOutput>> {
-    let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
-    for input in stage_inputs {
-        let stage_id = input.stage_id as usize;
-
-        let outputs = input
-            .partition_locations
-            .into_iter()
-            .map(|loc| {
-                let partition = loc.partition as usize;
-                let locations = loc
-                    .partition_location
-                    .into_iter()
-                    .map(|l| l.try_into())
-                    .collect::<Result<Vec<_>>>()?;
-                Ok((partition, locations))
-            })
-            .collect::<Result<HashMap<usize, Vec<PartitionLocation>>>>()?;
-
-        inputs.insert(
-            stage_id,
-            StageOutput {
-                partition_locations: outputs,
-                complete: input.complete,
-            },
-        );
-    }
-    Ok(inputs)
-}
-
-fn encode_inputs(
-    stage_inputs: HashMap<usize, StageOutput>,
-) -> Result<Vec<GraphStageInput>> {
-    let mut inputs: Vec<protobuf::GraphStageInput> = vec![];
-    for (stage_id, output) in stage_inputs.into_iter() {
-        inputs.push(protobuf::GraphStageInput {
-            stage_id: stage_id as u32,
-            partition_locations: output
-                .partition_locations
-                .into_iter()
-                .map(|(partition, locations)| {
-                    Ok(protobuf::TaskInputPartitions {
-                        partition: partition as u32,
-                        partition_location: locations
-                            .into_iter()
-                            .map(|l| l.try_into())
-                            .collect::<Result<Vec<_>>>()?,
-                    })
-                })
-                .collect::<Result<Vec<_>>>()?,
-            complete: output.complete,
-        });
-    }
-    Ok(inputs)
-}
-
-fn decode_taskinfo(task_info: protobuf::TaskInfo) -> TaskInfo {
-    let task_info_status = match task_info.status {
-        Some(task_info::Status::Running(running)) => {
-            task_status::Status::Running(running)
-        }
-        Some(task_info::Status::Failed(failed)) => 
task_status::Status::Failed(failed),
-        Some(task_info::Status::Successful(success)) => {
-            task_status::Status::Successful(success)
-        }
-        _ => panic!(
-            "protobuf::TaskInfo status for task {} should not be none",
-            task_info.task_id
-        ),
-    };
-    TaskInfo {
-        task_id: task_info.task_id as usize,
-        scheduled_time: task_info.scheduled_time as u128,
-        launch_time: task_info.launch_time as u128,
-        start_exec_time: task_info.start_exec_time as u128,
-        end_exec_time: task_info.end_exec_time as u128,
-        finish_time: task_info.finish_time as u128,
-        task_status: task_info_status,
-    }
-}
-
-fn encode_taskinfo(task_info: TaskInfo, partition_id: usize) -> 
protobuf::TaskInfo {
-    let task_info_status = match task_info.task_status {
-        task_status::Status::Running(running) => 
task_info::Status::Running(running),
-        task_status::Status::Failed(failed) => 
task_info::Status::Failed(failed),
-        task_status::Status::Successful(success) => {
-            task_info::Status::Successful(success)
-        }
-    };
-    protobuf::TaskInfo {
-        task_id: task_info.task_id as u32,
-        partition_id: partition_id as u32,
-        scheduled_time: task_info.scheduled_time as u64,
-        launch_time: task_info.launch_time as u64,
-        start_exec_time: task_info.start_exec_time as u64,
-        end_exec_time: task_info.end_exec_time as u64,
-        finish_time: task_info.finish_time as u64,
-        status: Some(task_info_status),
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to