This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new f073367d remove key-value stores for scheduler persistence (#1077)
f073367d is described below
commit f073367da57ff066c3fc8855c7382330ad5553d0
Author: Marko Milenković <[email protected]>
AuthorDate: Sat Oct 12 15:51:43 2024 +0100
remove key-value stores for scheduler persistence (#1077)
in-memory store only left, if HA required another
strategy like check-pointing should be implemented
---
.github/workflows/rust.yml | 2 -
ballista/scheduler/Cargo.toml | 8 +-
ballista/scheduler/scheduler_config_spec.toml | 20 -
ballista/scheduler/src/bin/main.rs | 18 +-
ballista/scheduler/src/cluster/kv.rs | 817 ---------------------
ballista/scheduler/src/cluster/mod.rs | 85 +--
ballista/scheduler/src/cluster/storage/etcd.rs | 346 ---------
ballista/scheduler/src/cluster/storage/mod.rs | 140 ----
ballista/scheduler/src/cluster/storage/sled.rs | 395 ----------
ballista/scheduler/src/config.rs | 4 -
ballista/scheduler/src/lib.rs | 1 -
ballista/scheduler/src/scheduler_server/grpc.rs | 2 +-
ballista/scheduler/src/scheduler_server/mod.rs | 2 +-
ballista/scheduler/src/standalone.rs | 9 +-
ballista/scheduler/src/state/execution_graph.rs | 173 +----
.../src/state/execution_graph/execution_stage.rs | 380 +---------
16 files changed, 21 insertions(+), 2381 deletions(-)
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index db1b7fc9..b1832b16 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -176,8 +176,6 @@ jobs:
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
cd ballista
- # snmalloc requires cmake so build without default features
- cargo test --no-default-features --features sled
# Ensure also compiles in standalone mode
cargo test --no-default-features --features standalone
env:
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 596db6d1..04837849 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -34,11 +34,9 @@ name = "ballista-scheduler"
path = "src/bin/main.rs"
[features]
-default = ["etcd", "sled", "flight-sql"]
-etcd = ["etcd-client"]
+default = ["flight-sql"]
flight-sql = []
prometheus-metrics = ["prometheus", "once_cell"]
-sled = ["sled_package", "tokio-stream"]
[dependencies]
@@ -54,7 +52,6 @@ configure_me = { workspace = true }
dashmap = "5.4.0"
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
-etcd-client = { version = "0.14", optional = true }
flatbuffers = { version = "23.5.26" }
futures = "0.3"
graphviz-rust = "0.8.0"
@@ -70,9 +67,8 @@ prost = { workspace = true }
prost-types = { workspace = true }
rand = "0.8"
serde = { version = "1", features = ["derive"] }
-sled_package = { package = "sled", version = "0.34", optional = true }
tokio = { version = "1.0", features = ["full"] }
-tokio-stream = { version = "0.1", features = ["net"], optional = true }
+tokio-stream = { version = "0.1", features = ["net"] }
tonic = { workspace = true }
# tonic 0.12.2 depends on tower 0.4.7
tower = { version = "0.4.7", default-features = false, features = ["make",
"util"] }
diff --git a/ballista/scheduler/scheduler_config_spec.toml
b/ballista/scheduler/scheduler_config_spec.toml
index 857212fe..804987d9 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -29,13 +29,6 @@ name = "advertise_flight_sql_endpoint"
type = "String"
doc = "Route for proxying flight results via scheduler. Should be of the form
'IP:PORT'"
-[[param]]
-abbr = "b"
-name = "cluster_backend"
-type = "ballista_scheduler::cluster::ClusterStorage"
-doc = "The configuration backend for the scheduler cluster state, possible
values: etcd, memory, sled. Default: sled"
-default = "ballista_scheduler::cluster::ClusterStorage::Sled"
-
[[param]]
abbr = "n"
name = "namespace"
@@ -43,13 +36,6 @@ type = "String"
doc = "Namespace for the ballista cluster that this executor will join.
Default: ballista"
default = "std::string::String::from(\"ballista\")"
-[[param]]
-abbr = "e"
-name = "etcd_urls"
-type = "String"
-doc = "etcd urls for use when discovery mode is `etcd`. Default:
localhost:2379"
-default = "std::string::String::from(\"localhost:2379\")"
-
[[param]]
name = "bind_host"
type = "String"
@@ -118,12 +104,6 @@ type = "String"
doc = "plugin dir"
default = "std::string::String::from(\"\")"
-[[param]]
-name = "sled_dir"
-type = "String"
-doc = "Sled dir: Opens a Db for saving schduler metadata at the specified
path. This will create a new storage directory at the specified path if it does
not already exist."
-default = "std::string::String::from(\"\")"
-
[[param]]
name = "log_dir"
type = "String"
diff --git a/ballista/scheduler/src/bin/main.rs
b/ballista/scheduler/src/bin/main.rs
index d2e2c9ce..7d8b4b1b 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -26,7 +26,6 @@ use crate::config::{Config, ResultExt};
use ballista_core::config::LogRotationPolicy;
use ballista_core::print_version;
use ballista_scheduler::cluster::BallistaCluster;
-use ballista_scheduler::cluster::ClusterStorage;
use ballista_scheduler::config::{
ClusterStorageConfig, SchedulerConfig, TaskDistribution,
TaskDistributionPolicy,
};
@@ -116,22 +115,7 @@ async fn inner() -> Result<()> {
let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
let addr = addr.parse()?;
- let cluster_storage_config = match opt.cluster_backend {
- ClusterStorage::Memory => ClusterStorageConfig::Memory,
- ClusterStorage::Etcd => ClusterStorageConfig::Etcd(
- opt.etcd_urls
- .split_whitespace()
- .map(|s| s.to_string())
- .collect(),
- ),
- ClusterStorage::Sled => {
- if opt.sled_dir.is_empty() {
- ClusterStorageConfig::Sled(None)
- } else {
- ClusterStorageConfig::Sled(Some(opt.sled_dir))
- }
- }
- };
+ let cluster_storage_config = ClusterStorageConfig::Memory;
let task_distribution = match opt.task_distribution {
TaskDistribution::Bias => TaskDistributionPolicy::Bias,
diff --git a/ballista/scheduler/src/cluster/kv.rs
b/ballista/scheduler/src/cluster/kv.rs
deleted file mode 100644
index 58e56d9e..00000000
--- a/ballista/scheduler/src/cluster/kv.rs
+++ /dev/null
@@ -1,817 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::cluster::storage::{KeyValueStore, Keyspace, Lock, Operation,
WatchEvent};
-use crate::cluster::{
- bind_task_bias, bind_task_consistent_hash, bind_task_round_robin,
get_scan_files,
- is_skip_consistent_hash, BoundTask, ClusterState, ExecutorHeartbeatStream,
- ExecutorSlot, JobState, JobStateEvent, JobStateEventStream, JobStatus,
- TaskDistributionPolicy, TopologyNode,
-};
-use crate::scheduler_server::{timestamp_secs, SessionBuilder};
-use crate::state::execution_graph::ExecutionGraph;
-use crate::state::session_manager::create_datafusion_context;
-use crate::state::task_manager::JobInfoCache;
-use crate::state::{decode_into, decode_protobuf};
-use async_trait::async_trait;
-use ballista_core::config::BallistaConfig;
-use ballista_core::consistent_hash::node::Node;
-use ballista_core::error::{BallistaError, Result};
-use ballista_core::serde::protobuf::job_status::Status;
-use ballista_core::serde::protobuf::{
- self, AvailableTaskSlots, ExecutorHeartbeat, ExecutorTaskSlots, FailedJob,
- KeyValuePair, QueuedJob,
-};
-use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
-use ballista_core::serde::BallistaCodec;
-use dashmap::DashMap;
-use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::SessionContext;
-use datafusion_proto::logical_plan::AsLogicalPlan;
-use datafusion_proto::physical_plan::AsExecutionPlan;
-use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
-use futures::StreamExt;
-use itertools::Itertools;
-use log::{error, info, warn};
-use prost::Message;
-use std::collections::{HashMap, HashSet};
-use std::future::Future;
-use std::sync::Arc;
-
-/// State implementation based on underlying `KeyValueStore`
-pub struct KeyValueState<
- S: KeyValueStore,
- T: 'static + AsLogicalPlan = LogicalPlanNode,
- U: 'static + AsExecutionPlan = PhysicalPlanNode,
-> {
- /// Underlying `KeyValueStore`
- store: S,
- /// ExecutorMetadata cache, executor_id -> ExecutorMetadata
- executors: Arc<DashMap<String, ExecutorMetadata>>,
- /// ExecutorHeartbeat cache, executor_id -> ExecutorHeartbeat
- executor_heartbeats: Arc<DashMap<String, ExecutorHeartbeat>>,
- /// Codec used to serialize/deserialize execution plan
- codec: BallistaCodec<T, U>,
- /// Name of current scheduler. Should be `{host}:{port}`
- #[allow(dead_code)]
- scheduler: String,
- /// In-memory store of queued jobs. Map from Job ID -> (Job Name,
queued_at timestamp)
- queued_jobs: DashMap<String, (String, u64)>,
- //// `SessionBuilder` for constructing `SessionContext` from stored
`BallistaConfig`
- session_builder: SessionBuilder,
-}
-
-impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
- KeyValueState<S, T, U>
-{
- pub fn new(
- scheduler: impl Into<String>,
- store: S,
- codec: BallistaCodec<T, U>,
- session_builder: SessionBuilder,
- ) -> Self {
- Self {
- store,
- executors: Arc::new(DashMap::new()),
- executor_heartbeats: Arc::new(DashMap::new()),
- scheduler: scheduler.into(),
- codec,
- queued_jobs: DashMap::new(),
- session_builder,
- }
- }
-
- /// Initialize the set of active executor heartbeats from storage
- async fn init_active_executor_heartbeats(&self) -> Result<()> {
- let heartbeats = self.store.scan(Keyspace::Heartbeats, None).await?;
-
- for (_, value) in heartbeats {
- let data: ExecutorHeartbeat = decode_protobuf(&value)?;
- if let Some(protobuf::ExecutorStatus {
- status: Some(protobuf::executor_status::Status::Active(_)),
- }) = &data.status
- {
- self.executor_heartbeats
- .insert(data.executor_id.clone(), data);
- }
- }
-
- Ok(())
- }
-
- /// Return the stream of executor heartbeats observed by all schedulers in
the cluster.
- /// This can be aggregated to provide an eventually consistent view of all
executors within the cluster
- async fn executor_heartbeat_stream(&self) ->
Result<ExecutorHeartbeatStream> {
- let events = self
- .store
- .watch(Keyspace::Heartbeats, String::default())
- .await?;
-
- Ok(events
- .filter_map(|event| {
- futures::future::ready(match event {
- WatchEvent::Put(_, value) => {
- if let Ok(heartbeat) =
- decode_protobuf::<ExecutorHeartbeat>(&value)
- {
- Some(heartbeat)
- } else {
- None
- }
- }
- WatchEvent::Delete(_) => None,
- })
- })
- .boxed())
- }
-
- /// Get the topology nodes of the cluster for consistent hashing
- fn get_topology_nodes(
- &self,
- available_slots: &[AvailableTaskSlots],
- executors: Option<HashSet<String>>,
- ) -> HashMap<String, TopologyNode> {
- let mut nodes: HashMap<String, TopologyNode> = HashMap::new();
- for slots in available_slots {
- if let Some(executors) = executors.as_ref() {
- if !executors.contains(&slots.executor_id) {
- continue;
- }
- }
- if let Some(executor) = self.executors.get(&slots.executor_id) {
- let node = TopologyNode::new(
- &executor.host,
- executor.port,
- &slots.executor_id,
- self.executor_heartbeats
- .get(&executor.id)
- .map(|heartbeat| heartbeat.timestamp)
- .unwrap_or(0),
- slots.slots,
- );
- if let Some(existing_node) = nodes.get(node.name()) {
- if existing_node.last_seen_ts < node.last_seen_ts {
- nodes.insert(node.name().to_string(), node);
- }
- } else {
- nodes.insert(node.name().to_string(), node);
- }
- }
- }
- nodes
- }
-}
-
-#[async_trait]
-impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
- ClusterState for KeyValueState<S, T, U>
-{
- /// Initialize a background process that will listen for executor
heartbeats and update the in-memory cache
- /// of executor heartbeats
- async fn init(&self) -> Result<()> {
- self.init_active_executor_heartbeats().await?;
-
- let mut heartbeat_stream = self.executor_heartbeat_stream().await?;
-
- info!("Initializing heartbeat listener");
-
- let heartbeats = self.executor_heartbeats.clone();
- let executors = self.executors.clone();
- tokio::task::spawn(async move {
- while let Some(heartbeat) = heartbeat_stream.next().await {
- let executor_id = heartbeat.executor_id.clone();
-
- match heartbeat
- .status
- .as_ref()
- .and_then(|status| status.status.as_ref())
- {
- Some(protobuf::executor_status::Status::Dead(_)) => {
- heartbeats.remove(&executor_id);
- executors.remove(&executor_id);
- }
- _ => {
- heartbeats.insert(executor_id, heartbeat);
- }
- }
- }
- });
-
- Ok(())
- }
-
- async fn bind_schedulable_tasks(
- &self,
- distribution: TaskDistributionPolicy,
- active_jobs: Arc<HashMap<String, JobInfoCache>>,
- executors: Option<HashSet<String>>,
- ) -> Result<Vec<BoundTask>> {
- let lock = self.store.lock(Keyspace::Slots, "global").await?;
-
- with_lock(lock, async {
- let resources = self.store.get(Keyspace::Slots, "all").await?;
-
- let mut slots =
- ExecutorTaskSlots::decode(resources.as_slice()).map_err(|err| {
- BallistaError::Internal(format!(
- "Unexpected value in executor slots state: {err:?}"
- ))
- })?;
-
- let available_slots: Vec<&mut AvailableTaskSlots> = slots
- .task_slots
- .iter_mut()
- .filter_map(|data| {
- (data.slots > 0
- && executors
- .as_ref()
- .map(|executors|
executors.contains(&data.executor_id))
- .unwrap_or(true))
- .then_some(data)
- })
- .collect();
-
- let bound_tasks = match distribution {
- TaskDistributionPolicy::Bias => {
- bind_task_bias(available_slots, active_jobs, |_|
false).await
- }
- TaskDistributionPolicy::RoundRobin => {
- bind_task_round_robin(available_slots, active_jobs, |_|
false).await
- }
- TaskDistributionPolicy::ConsistentHash {
- num_replicas,
- tolerance,
- } => {
- let mut bound_tasks = bind_task_round_robin(
- available_slots,
- active_jobs.clone(),
- |stage_plan: Arc<dyn ExecutionPlan>| {
- if let Ok(scan_files) = get_scan_files(stage_plan)
{
- // Should be opposite to consistent hash ones.
- !is_skip_consistent_hash(&scan_files)
- } else {
- false
- }
- },
- )
- .await;
- info!("{} tasks bound by round robin policy",
bound_tasks.len());
- let (bound_tasks_consistent_hash, ch_topology) =
- bind_task_consistent_hash(
- self.get_topology_nodes(&slots.task_slots,
executors),
- num_replicas,
- tolerance,
- active_jobs,
- |_, plan| get_scan_files(plan),
- )
- .await?;
- info!(
- "{} tasks bound by consistent hashing policy",
- bound_tasks_consistent_hash.len()
- );
- if !bound_tasks_consistent_hash.is_empty() {
- bound_tasks.extend(bound_tasks_consistent_hash);
- // Update the available slots
- let mut executor_data: HashMap<String,
AvailableTaskSlots> =
- slots
- .task_slots
- .into_iter()
- .map(|slots| (slots.executor_id.clone(),
slots))
- .collect();
- let ch_topology = ch_topology.unwrap();
- for node in ch_topology.nodes() {
- if let Some(data) =
executor_data.get_mut(&node.id) {
- data.slots = node.available_slots;
- } else {
- error!("Fail to find executor data for {}",
&node.id);
- }
- }
- slots.task_slots =
executor_data.into_values().collect();
- }
- bound_tasks
- }
- };
-
- if !bound_tasks.is_empty() {
- self.store
- .put(Keyspace::Slots, "all".to_owned(),
slots.encode_to_vec())
- .await?
- }
-
- Ok(bound_tasks)
- })
- .await
- }
-
- async fn unbind_tasks(&self, executor_slots: Vec<ExecutorSlot>) ->
Result<()> {
- let mut increments = HashMap::new();
- for (executor_id, num_slots) in executor_slots {
- let v = increments.entry(executor_id).or_insert_with(|| 0);
- *v += num_slots;
- }
-
- let lock = self.store.lock(Keyspace::Slots, "all").await?;
-
- with_lock(lock, async {
- let resources = self.store.get(Keyspace::Slots, "all").await?;
-
- let mut slots =
- ExecutorTaskSlots::decode(resources.as_slice()).map_err(|err| {
- BallistaError::Internal(format!(
- "Unexpected value in executor slots state: {err:?}"
- ))
- })?;
-
- for executor_slots in slots.task_slots.iter_mut() {
- if let Some(slots) =
increments.get(&executor_slots.executor_id) {
- executor_slots.slots += *slots;
- }
- }
-
- self.store
- .put(Keyspace::Slots, "all".to_string(), slots.encode_to_vec())
- .await
- })
- .await
- }
-
- async fn register_executor(
- &self,
- metadata: ExecutorMetadata,
- spec: ExecutorData,
- ) -> Result<()> {
- let executor_id = metadata.id.clone();
-
- //TODO this should be in a transaction
- // Now that we know we can connect, save the metadata and slots
- self.save_executor_metadata(metadata).await?;
- self.save_executor_heartbeat(ExecutorHeartbeat {
- executor_id: executor_id.clone(),
- timestamp: timestamp_secs(),
- metrics: vec![],
- status: Some(protobuf::ExecutorStatus {
- status: Some(
-
protobuf::executor_status::Status::Active(String::default()),
- ),
- }),
- })
- .await?;
-
- let available_slots = AvailableTaskSlots {
- executor_id,
- slots: spec.available_task_slots,
- };
-
- let lock = self.store.lock(Keyspace::Slots, "all").await?;
-
- with_lock(lock, async {
- let current_slots = self.store.get(Keyspace::Slots, "all").await?;
-
- let mut current_slots: ExecutorTaskSlots =
- decode_protobuf(current_slots.as_slice())?;
-
- if let Some((idx, _)) = current_slots
- .task_slots
- .iter()
- .find_position(|slots| slots.executor_id ==
available_slots.executor_id)
- {
- current_slots.task_slots[idx] = available_slots;
- } else {
- current_slots.task_slots.push(available_slots);
- }
-
- self.store
- .put(
- Keyspace::Slots,
- "all".to_string(),
- current_slots.encode_to_vec(),
- )
- .await
- })
- .await?;
-
- Ok(())
- }
-
- async fn save_executor_metadata(&self, metadata: ExecutorMetadata) ->
Result<()> {
- let executor_id = metadata.id.clone();
-
- let proto: protobuf::ExecutorMetadata = metadata.clone().into();
- self.store
- .put(
- Keyspace::Executors,
- executor_id.clone(),
- proto.encode_to_vec(),
- )
- .await?;
-
- self.executors.insert(executor_id, metadata);
-
- Ok(())
- }
-
- async fn get_executor_metadata(&self, executor_id: &str) ->
Result<ExecutorMetadata> {
- let metadata = if let Some(metadata) = self.executors.get(executor_id)
{
- metadata.value().clone()
- } else {
- let value = self.store.get(Keyspace::Executors,
executor_id).await?;
- if value.is_empty() {
- return Err(BallistaError::Internal(format!(
- "Executor {} not registered",
- executor_id
- )));
- }
- let decoded =
- decode_into::<protobuf::ExecutorMetadata,
ExecutorMetadata>(&value)?;
- self.executors
- .insert(executor_id.to_string(), decoded.clone());
-
- decoded
- };
-
- Ok(metadata)
- }
-
- async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) ->
Result<()> {
- let executor_id = heartbeat.executor_id.clone();
- self.store
- .put(
- Keyspace::Heartbeats,
- executor_id.clone(),
- heartbeat.clone().encode_to_vec(),
- )
- .await?;
- self.executor_heartbeats.insert(executor_id, heartbeat);
- Ok(())
- }
-
- async fn remove_executor(&self, executor_id: &str) -> Result<()> {
- let value = ExecutorHeartbeat {
- executor_id: executor_id.to_owned(),
- timestamp: timestamp_secs(),
- metrics: vec![],
- status: Some(protobuf::ExecutorStatus {
- status:
Some(protobuf::executor_status::Status::Dead("".to_string())),
- }),
- }
- .encode_to_vec();
-
- self.store
- .put(Keyspace::Heartbeats, executor_id.to_owned(), value)
- .await?;
- self.executor_heartbeats.remove(executor_id);
-
- // TODO Check the Executor reservation logic for push-based scheduling
-
- Ok(())
- }
-
- fn executor_heartbeats(&self) -> HashMap<String, ExecutorHeartbeat> {
- self.executor_heartbeats
- .iter()
- .map(|r| (r.key().clone(), r.value().clone()))
- .collect()
- }
-
- fn get_executor_heartbeat(&self, executor_id: &str) ->
Option<ExecutorHeartbeat> {
- self.executor_heartbeats
- .get(executor_id)
- .map(|r| r.value().clone())
- }
-}
-
-#[async_trait]
-impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> JobState
- for KeyValueState<S, T, U>
-{
- fn accept_job(&self, job_id: &str, job_name: &str, queued_at: u64) ->
Result<()> {
- self.queued_jobs
- .insert(job_id.to_string(), (job_name.to_string(), queued_at));
-
- Ok(())
- }
-
- fn pending_job_number(&self) -> usize {
- self.queued_jobs.len()
- }
-
- async fn submit_job(&self, job_id: String, graph: &ExecutionGraph) ->
Result<()> {
- if self.queued_jobs.get(&job_id).is_some() {
- let status = graph.status();
- let encoded_graph =
- ExecutionGraph::encode_execution_graph(graph.clone(),
&self.codec)?;
-
- self.store
- .apply_txn(vec![
- (
- Operation::Put(status.encode_to_vec()),
- Keyspace::JobStatus,
- job_id.clone(),
- ),
- (
- Operation::Put(encoded_graph.encode_to_vec()),
- Keyspace::ExecutionGraph,
- job_id.clone(),
- ),
- ])
- .await?;
-
- self.queued_jobs.remove(&job_id);
-
- Ok(())
- } else {
- Err(BallistaError::Internal(format!(
- "Failed to submit job {job_id}, job was not in queueud jobs"
- )))
- }
- }
-
- async fn get_jobs(&self) -> Result<HashSet<String>> {
- self.store.scan_keys(Keyspace::JobStatus).await
- }
-
- async fn get_job_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
- if let Some((job_name, queued_at)) =
self.queued_jobs.get(job_id).as_deref() {
- Ok(Some(JobStatus {
- job_id: job_id.to_string(),
- job_name: job_name.clone(),
- status: Some(Status::Queued(QueuedJob {
- queued_at: *queued_at,
- })),
- }))
- } else {
- let value = self.store.get(Keyspace::JobStatus, job_id).await?;
-
- (!value.is_empty())
- .then(|| decode_protobuf(value.as_slice()))
- .transpose()
- }
- }
-
- async fn get_execution_graph(&self, job_id: &str) ->
Result<Option<ExecutionGraph>> {
- let value = self.store.get(Keyspace::ExecutionGraph, job_id).await?;
-
- if value.is_empty() {
- return Ok(None);
- }
-
- let proto: protobuf::ExecutionGraph =
decode_protobuf(value.as_slice())?;
-
- let session = self.get_session(&proto.session_id).await?;
-
- Ok(Some(
- ExecutionGraph::decode_execution_graph(proto, &self.codec,
session.as_ref())
- .await?,
- ))
- }
-
- async fn save_job(&self, job_id: &str, graph: &ExecutionGraph) ->
Result<()> {
- let status = graph.status();
- let encoded_graph =
- ExecutionGraph::encode_execution_graph(graph.clone(),
&self.codec)?;
-
- self.store
- .apply_txn(vec![
- (
- Operation::Put(status.encode_to_vec()),
- Keyspace::JobStatus,
- job_id.to_string(),
- ),
- (
- Operation::Put(encoded_graph.encode_to_vec()),
- Keyspace::ExecutionGraph,
- job_id.to_string(),
- ),
- ])
- .await
- }
-
- async fn fail_unscheduled_job(&self, job_id: &str, reason: String) ->
Result<()> {
- if let Some((job_id, (job_name, queued_at))) =
self.queued_jobs.remove(job_id) {
- let status = JobStatus {
- job_id: job_id.clone(),
- job_name,
- status: Some(Status::Failed(FailedJob {
- error: reason,
- queued_at,
- started_at: 0,
- ended_at: 0,
- })),
- };
-
- self.store
- .put(Keyspace::JobStatus, job_id, status.encode_to_vec())
- .await
- } else {
- Err(BallistaError::Internal(format!(
- "Could not fail unscheduled job {job_id}, not found in queued
jobs"
- )))
- }
- }
-
- async fn remove_job(&self, job_id: &str) -> Result<()> {
- if self.queued_jobs.remove(job_id).is_none() {
- self.store
- .apply_txn(vec![
- (Operation::Delete, Keyspace::JobStatus,
job_id.to_string()),
- (
- Operation::Delete,
- Keyspace::ExecutionGraph,
- job_id.to_string(),
- ),
- ])
- .await
- } else {
- Ok(())
- }
- }
-
- async fn try_acquire_job(&self, _job_id: &str) ->
Result<Option<ExecutionGraph>> {
- Err(BallistaError::NotImplemented(
- "Work stealing is not currently implemented".to_string(),
- ))
- }
-
- async fn job_state_events(&self) -> Result<JobStateEventStream> {
- let watch = self
- .store
- .watch(Keyspace::JobStatus, String::default())
- .await?;
-
- let stream = watch
- .filter_map(|event| {
- futures::future::ready(match event {
- WatchEvent::Put(key, value) => {
- if let Some(job_id) =
Keyspace::JobStatus.strip_prefix(&key) {
- match JobStatus::decode(value.as_slice()) {
- Ok(status) => Some(JobStateEvent::JobUpdated {
- job_id: job_id.to_string(),
- status,
- }),
- Err(err) => {
- warn!(
- "Error decoding job status from watch
event: {err:?}"
- );
- None
- }
- }
- } else {
- None
- }
- }
- _ => None,
- })
- })
- .boxed();
-
- Ok(stream)
- }
-
- async fn get_session(&self, session_id: &str) ->
Result<Arc<SessionContext>> {
- let value = self.store.get(Keyspace::Sessions, session_id).await?;
-
- let settings: protobuf::SessionSettings = decode_protobuf(&value)?;
-
- let mut config_builder = BallistaConfig::builder();
- for kv_pair in &settings.configs {
- config_builder = config_builder.set(&kv_pair.key, &kv_pair.value);
- }
- let config = config_builder.build()?;
-
- Ok(create_datafusion_context(&config, self.session_builder))
- }
-
- async fn create_session(
- &self,
- config: &BallistaConfig,
- ) -> Result<Arc<SessionContext>> {
- let mut settings: Vec<KeyValuePair> = vec![];
-
- for (key, value) in config.settings() {
- settings.push(KeyValuePair {
- key: key.clone(),
- value: value.clone(),
- })
- }
-
- let value = protobuf::SessionSettings { configs: settings };
-
- let session = create_datafusion_context(config, self.session_builder);
-
- self.store
- .put(
- Keyspace::Sessions,
- session.session_id(),
- value.encode_to_vec(),
- )
- .await?;
-
- Ok(session)
- }
-
- async fn update_session(
- &self,
- session_id: &str,
- config: &BallistaConfig,
- ) -> Result<Arc<SessionContext>> {
- let mut settings: Vec<KeyValuePair> = vec![];
-
- for (key, value) in config.settings() {
- settings.push(KeyValuePair {
- key: key.clone(),
- value: value.clone(),
- })
- }
-
- let value = protobuf::SessionSettings { configs: settings };
- self.store
- .put(
- Keyspace::Sessions,
- session_id.to_owned(),
- value.encode_to_vec(),
- )
- .await?;
-
- Ok(create_datafusion_context(config, self.session_builder))
- }
-
- async fn remove_session(
- &self,
- session_id: &str,
- ) -> Result<Option<Arc<SessionContext>>> {
- let session_ctx = self.get_session(session_id).await.ok();
-
- self.store.delete(Keyspace::Sessions, session_id).await?;
-
- Ok(session_ctx)
- }
-}
-
-async fn with_lock<Out, F: Future<Output = Out>>(mut lock: Box<dyn Lock>, op:
F) -> Out {
- let result = op.await;
- lock.unlock().await;
- result
-}
-
-#[cfg(test)]
-mod test {
-
- use crate::cluster::kv::KeyValueState;
- use crate::cluster::storage::sled::SledClient;
- use crate::cluster::test_util::{test_job_lifecycle,
test_job_planning_failure};
- use crate::test_utils::{
- test_aggregation_plan, test_join_plan, test_two_aggregations_plan,
- };
- use ballista_core::error::Result;
- use ballista_core::serde::BallistaCodec;
- use ballista_core::utils::default_session_builder;
-
- #[cfg(feature = "sled")]
- #[tokio::test]
- async fn test_sled_job_lifecycle() -> Result<()> {
- test_job_lifecycle(make_sled_state()?,
test_aggregation_plan(4).await).await?;
- test_job_lifecycle(make_sled_state()?,
test_two_aggregations_plan(4).await)
- .await?;
- test_job_lifecycle(make_sled_state()?, test_join_plan(4).await).await?;
- Ok(())
- }
-
- #[cfg(feature = "sled")]
- #[tokio::test]
- async fn test_in_memory_job_planning_failure() -> Result<()> {
- test_job_planning_failure(make_sled_state()?,
test_aggregation_plan(4).await)
- .await?;
- test_job_planning_failure(
- make_sled_state()?,
- test_two_aggregations_plan(4).await,
- )
- .await?;
- test_job_planning_failure(make_sled_state()?,
test_join_plan(4).await).await?;
-
- Ok(())
- }
-
- #[cfg(feature = "sled")]
- fn make_sled_state() -> Result<KeyValueState<SledClient>> {
- Ok(KeyValueState::new(
- "",
- SledClient::try_new_temporary()?,
- BallistaCodec::default(),
- default_session_builder,
- ))
- }
-}
diff --git a/ballista/scheduler/src/cluster/mod.rs
b/ballista/scheduler/src/cluster/mod.rs
index b7489a25..5b9ecedf 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -28,36 +28,28 @@ use datafusion::datasource::physical_plan::{AvroExec,
CsvExec, NdJsonExec, Parqu
use datafusion::error::DataFusionError;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
-use datafusion_proto::logical_plan::AsLogicalPlan;
-use datafusion_proto::physical_plan::AsExecutionPlan;
use futures::Stream;
use log::{debug, info, warn};
use ballista_core::config::BallistaConfig;
use ballista_core::consistent_hash;
use ballista_core::consistent_hash::ConsistentHash;
-use ballista_core::error::{BallistaError, Result};
+use ballista_core::error::Result;
use ballista_core::serde::protobuf::{
job_status, AvailableTaskSlots, ExecutorHeartbeat, JobStatus,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata,
PartitionId};
-use ballista_core::serde::BallistaCodec;
use ballista_core::utils::default_session_builder;
-use crate::cluster::kv::KeyValueState;
use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
-use crate::cluster::storage::etcd::EtcdClient;
-use crate::cluster::storage::sled::SledClient;
-use crate::cluster::storage::KeyValueStore;
+
use crate::config::{ClusterStorageConfig, SchedulerConfig,
TaskDistributionPolicy};
use crate::scheduler_server::SessionBuilder;
use crate::state::execution_graph::{create_task_info, ExecutionGraph,
TaskDescription};
use crate::state::task_manager::JobInfoCache;
pub mod event;
-pub mod kv;
pub mod memory;
-pub mod storage;
#[cfg(test)]
#[allow(clippy::uninlined_format_args)]
@@ -67,9 +59,7 @@ pub mod test_util;
// needs to be visible to code generated by configure_me
#[derive(Debug, Clone, ValueEnum, serde::Deserialize, PartialEq, Eq)]
pub enum ClusterStorage {
- Etcd,
Memory,
- Sled,
}
impl std::str::FromStr for ClusterStorage {
@@ -113,81 +103,10 @@ impl BallistaCluster {
}
}
- pub fn new_kv<
- S: KeyValueStore,
- T: 'static + AsLogicalPlan,
- U: 'static + AsExecutionPlan,
- >(
- store: S,
- scheduler: impl Into<String>,
- session_builder: SessionBuilder,
- codec: BallistaCodec<T, U>,
- ) -> Self {
- let kv_state =
- Arc::new(KeyValueState::new(scheduler, store, codec,
session_builder));
- Self {
- cluster_state: kv_state.clone(),
- job_state: kv_state,
- }
- }
-
pub async fn new_from_config(config: &SchedulerConfig) -> Result<Self> {
let scheduler = config.scheduler_name();
match &config.cluster_storage {
- #[cfg(feature = "etcd")]
- ClusterStorageConfig::Etcd(urls) => {
- let etcd = etcd_client::Client::connect(urls.as_slice(), None)
- .await
- .map_err(|err| {
- BallistaError::Internal(format!(
- "Could not connect to etcd: {err:?}"
- ))
- })?;
-
- Ok(Self::new_kv(
- EtcdClient::new(config.namespace.clone(), etcd),
- scheduler,
- default_session_builder,
- BallistaCodec::default(),
- ))
- }
- #[cfg(not(feature = "etcd"))]
- StateBackend::Etcd => {
- unimplemented!(
- "build the scheduler with the `etcd` feature to use the
etcd config backend"
- )
- }
- #[cfg(feature = "sled")]
- ClusterStorageConfig::Sled(dir) => {
- if let Some(dir) = dir.as_ref() {
- info!("Initializing Sled database in directory {}", dir);
- let sled = SledClient::try_new(dir)?;
-
- Ok(Self::new_kv(
- sled,
- scheduler,
- default_session_builder,
- BallistaCodec::default(),
- ))
- } else {
- info!("Initializing Sled database in temp directory");
- let sled = SledClient::try_new_temporary()?;
-
- Ok(Self::new_kv(
- sled,
- scheduler,
- default_session_builder,
- BallistaCodec::default(),
- ))
- }
- }
- #[cfg(not(feature = "sled"))]
- StateBackend::Sled => {
- unimplemented!(
- "build the scheduler with the `sled` feature to use the
sled config backend"
- )
- }
ClusterStorageConfig::Memory => Ok(BallistaCluster::new_memory(
scheduler,
default_session_builder,
diff --git a/ballista/scheduler/src/cluster/storage/etcd.rs
b/ballista/scheduler/src/cluster/storage/etcd.rs
deleted file mode 100644
index 514511c9..00000000
--- a/ballista/scheduler/src/cluster/storage/etcd.rs
+++ /dev/null
@@ -1,346 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::collections::HashSet;
-
-use std::task::Poll;
-
-use async_trait::async_trait;
-use ballista_core::error::{ballista_error, Result};
-use std::time::Instant;
-
-use crate::cluster::storage::KeyValueStore;
-use etcd_client::{
- GetOptions, LockOptions, LockResponse, Txn, TxnOp, WatchOptions,
WatchStream, Watcher,
-};
-use futures::{Stream, StreamExt};
-use log::{debug, error, warn};
-
-use crate::cluster::storage::{Keyspace, Lock, Operation, Watch, WatchEvent};
-
-/// A [`StateBackendClient`] implementation that uses etcd to save cluster
state.
-#[derive(Clone)]
-pub struct EtcdClient {
- namespace: String,
- etcd: etcd_client::Client,
-}
-
-impl EtcdClient {
- pub fn new(namespace: String, etcd: etcd_client::Client) -> Self {
- Self { namespace, etcd }
- }
-}
-
-#[async_trait]
-impl KeyValueStore for EtcdClient {
- async fn get(&self, keyspace: Keyspace, key: &str) -> Result<Vec<u8>> {
- let key = format!("/{}/{:?}/{}", self.namespace, keyspace, key);
-
- Ok(self
- .etcd
- .clone()
- .get(key, None)
- .await
- .map_err(|e| ballista_error(&format!("etcd error {e:?}")))?
- .kvs()
- .first()
- .map(|kv| kv.value().to_owned())
- .unwrap_or_default())
- }
-
- async fn get_from_prefix(
- &self,
- keyspace: Keyspace,
- prefix: &str,
- ) -> Result<Vec<(String, Vec<u8>)>> {
- let prefix = format!("/{}/{:?}/{}", self.namespace, keyspace, prefix);
-
- Ok(self
- .etcd
- .clone()
- .get(prefix, Some(GetOptions::new().with_prefix()))
- .await
- .map_err(|e| ballista_error(&format!("etcd error {e:?}")))?
- .kvs()
- .iter()
- .map(|kv| (kv.key_str().unwrap().to_owned(),
kv.value().to_owned()))
- .collect())
- }
-
- async fn scan(
- &self,
- keyspace: Keyspace,
- limit: Option<usize>,
- ) -> Result<Vec<(String, Vec<u8>)>> {
- let prefix = format!("/{}/{:?}/", self.namespace, keyspace);
-
- let options = if let Some(limit) = limit {
- GetOptions::new().with_prefix().with_limit(limit as i64)
- } else {
- GetOptions::new().with_prefix()
- };
-
- Ok(self
- .etcd
- .clone()
- .get(prefix, Some(options))
- .await
- .map_err(|e| ballista_error(&format!("etcd error {e:?}")))?
- .kvs()
- .iter()
- .map(|kv| (kv.key_str().unwrap().to_owned(),
kv.value().to_owned()))
- .collect())
- }
-
- async fn scan_keys(&self, keyspace: Keyspace) -> Result<HashSet<String>> {
- let prefix = format!("/{}/{:?}/", self.namespace, keyspace);
-
- let options = GetOptions::new().with_prefix().with_keys_only();
-
- Ok(self
- .etcd
- .clone()
- .get(prefix.clone(), Some(options))
- .await
- .map_err(|e| ballista_error(&format!("etcd error {e:?}")))?
- .kvs()
- .iter()
- .map(|kv| {
- kv.key_str()
- .unwrap()
- .strip_prefix(&prefix)
- .unwrap()
- .to_owned()
- })
- .collect())
- }
-
- async fn put(&self, keyspace: Keyspace, key: String, value: Vec<u8>) ->
Result<()> {
- let key = format!("/{}/{:?}/{}", self.namespace, keyspace, key);
-
- let mut etcd = self.etcd.clone();
- etcd.put(key, value.clone(), None)
- .await
- .map_err(|e| {
- warn!("etcd put failed: {}", e);
- ballista_error(&format!("etcd put failed: {e}"))
- })
- .map(|_| ())
- }
-
- /// Apply multiple operations in a single transaction.
- async fn apply_txn(&self, ops: Vec<(Operation, Keyspace, String)>) ->
Result<()> {
- let mut etcd = self.etcd.clone();
-
- let txn_ops: Vec<TxnOp> = ops
- .into_iter()
- .map(|(operation, ks, key)| {
- let key = format!("/{}/{:?}/{}", self.namespace, ks, key);
- match operation {
- Operation::Put(value) => TxnOp::put(key, value, None),
- Operation::Delete => TxnOp::delete(key, None),
- }
- })
- .collect();
-
- etcd.txn(Txn::new().and_then(txn_ops))
- .await
- .map_err(|e| {
- error!("etcd operation failed: {}", e);
- ballista_error(&format!("etcd operation failed: {e}"))
- })
- .map(|_| ())
- }
-
- async fn mv(
- &self,
- from_keyspace: Keyspace,
- to_keyspace: Keyspace,
- key: &str,
- ) -> Result<()> {
- let mut etcd = self.etcd.clone();
- let from_key = format!("/{}/{:?}/{}", self.namespace, from_keyspace,
key);
- let to_key = format!("/{}/{:?}/{}", self.namespace, to_keyspace, key);
-
- let current_value = etcd
- .get(from_key.as_str(), None)
- .await
- .map_err(|e| ballista_error(&format!("etcd error {e:?}")))?
- .kvs()
- .first()
- .map(|kv| kv.value().to_owned());
-
- if let Some(value) = current_value {
- let txn = Txn::new().and_then(vec![
- TxnOp::delete(from_key.as_str(), None),
- TxnOp::put(to_key.as_str(), value, None),
- ]);
- etcd.txn(txn).await.map_err(|e| {
- error!("etcd put failed: {}", e);
- ballista_error("etcd move failed")
- })?;
- } else {
- warn!("Cannot move value at {}, does not exist", from_key);
- }
-
- Ok(())
- }
-
- async fn lock(&self, keyspace: Keyspace, key: &str) -> Result<Box<dyn
Lock>> {
- let start = Instant::now();
- let mut etcd = self.etcd.clone();
-
- let lock_id = format!("/{}/mutex/{:?}/{}", self.namespace, keyspace,
key);
-
- // Create a lease which expires after 30 seconds. We then associate
this lease with the lock
- // acquired below. This protects against a scheduler dying
unexpectedly while holding locks
- // on shared resources. In that case, those locks would expire once
the lease expires.
- // TODO This is not great to do for every lock. We should have a
single lease per scheduler instance
- let lease_id = etcd
- .lease_client()
- .grant(30, None)
- .await
- .map_err(|e| {
- warn!("etcd lease failed: {}", e);
- ballista_error("etcd lease failed")
- })?
- .id();
-
- let lock_options = LockOptions::new().with_lease(lease_id);
-
- let lock = etcd
- .lock(lock_id.as_str(), Some(lock_options))
- .await
- .map_err(|e| {
- warn!("etcd lock failed: {}", e);
- ballista_error("etcd lock failed")
- })?;
-
- let elapsed = start.elapsed();
- debug!("Acquired lock {} in {:?}", lock_id, elapsed);
- Ok(Box::new(EtcdLockGuard { etcd, lock }))
- }
-
- async fn watch(&self, keyspace: Keyspace, prefix: String) ->
Result<Box<dyn Watch>> {
- let prefix = format!("/{}/{:?}/{}", self.namespace, keyspace, prefix);
-
- let mut etcd = self.etcd.clone();
- let options = WatchOptions::new().with_prefix();
- let (watcher, stream) = etcd.watch(prefix,
Some(options)).await.map_err(|e| {
- warn!("etcd watch failed: {}", e);
- ballista_error("etcd watch failed")
- })?;
- Ok(Box::new(EtcdWatch {
- watcher,
- stream,
- buffered_events: Vec::new(),
- }))
- }
-
- async fn delete(&self, keyspace: Keyspace, key: &str) -> Result<()> {
- let key = format!("/{}/{:?}/{}", self.namespace, keyspace, key);
-
- let mut etcd = self.etcd.clone();
-
- etcd.delete(key, None).await.map_err(|e| {
- warn!("etcd delete failed: {:?}", e);
- ballista_error("etcd delete failed")
- })?;
-
- Ok(())
- }
-}
-
-struct EtcdWatch {
- watcher: Watcher,
- stream: WatchStream,
- buffered_events: Vec<WatchEvent>,
-}
-
-#[tonic::async_trait]
-impl Watch for EtcdWatch {
- async fn cancel(&mut self) -> Result<()> {
- self.watcher.cancel().await.map_err(|e| {
- warn!("etcd watch cancel failed: {}", e);
- ballista_error("etcd watch cancel failed")
- })
- }
-}
-
-impl Stream for EtcdWatch {
- type Item = WatchEvent;
-
- fn poll_next(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- let self_mut = self.get_mut();
- if let Some(event) = self_mut.buffered_events.pop() {
- Poll::Ready(Some(event))
- } else {
- loop {
- match self_mut.stream.poll_next_unpin(cx) {
- Poll::Ready(Some(Err(e))) => {
- warn!("Error when watching etcd prefix: {}", e);
- continue;
- }
- Poll::Ready(Some(Ok(v))) => {
-
self_mut.buffered_events.extend(v.events().iter().map(|ev| {
- match ev.event_type() {
- etcd_client::EventType::Put => {
- let kv = ev.kv().unwrap();
- WatchEvent::Put(
- kv.key_str().unwrap().to_string(),
- kv.value().to_owned(),
- )
- }
- etcd_client::EventType::Delete => {
- let kv = ev.kv().unwrap();
-
WatchEvent::Delete(kv.key_str().unwrap().to_string())
- }
- }
- }));
- if let Some(event) = self_mut.buffered_events.pop() {
- return Poll::Ready(Some(event));
- } else {
- continue;
- }
- }
- Poll::Ready(None) => return Poll::Ready(None),
- Poll::Pending => return Poll::Pending,
- }
- }
- }
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
- }
-}
-
-struct EtcdLockGuard {
- etcd: etcd_client::Client,
- lock: LockResponse,
-}
-
-// Cannot use Drop because we need this to be async
-#[tonic::async_trait]
-impl Lock for EtcdLockGuard {
- async fn unlock(&mut self) {
- self.etcd.unlock(self.lock.key()).await.unwrap();
- }
-}
diff --git a/ballista/scheduler/src/cluster/storage/mod.rs
b/ballista/scheduler/src/cluster/storage/mod.rs
deleted file mode 100644
index 6aa049c0..00000000
--- a/ballista/scheduler/src/cluster/storage/mod.rs
+++ /dev/null
@@ -1,140 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#[cfg(feature = "etcd")]
-pub mod etcd;
-#[cfg(feature = "sled")]
-pub mod sled;
-
-use async_trait::async_trait;
-use ballista_core::error::Result;
-use futures::{future, Stream};
-use std::collections::HashSet;
-use tokio::sync::OwnedMutexGuard;
-
-#[derive(Debug, Clone, Eq, PartialEq, Hash)]
-pub enum Keyspace {
- Executors,
- JobStatus,
- ExecutionGraph,
- Slots,
- Sessions,
- Heartbeats,
-}
-
-impl Keyspace {
- pub fn strip_prefix<'a>(&'a self, key: &'a str) -> Option<&'a str> {
- key.strip_prefix(&format!("{self:?}/"))
- }
-}
-
-#[derive(Debug, Eq, PartialEq, Hash)]
-pub enum Operation {
- Put(Vec<u8>),
- Delete,
-}
-
-/// A trait that defines a KeyValue interface with basic locking primitives
for persisting Ballista cluster state
-#[async_trait]
-pub trait KeyValueStore: Send + Sync + Clone + 'static {
- /// Retrieve the data associated with a specific key in a given keyspace.
- ///
- /// An empty vec is returned if the key does not exist.
- async fn get(&self, keyspace: Keyspace, key: &str) -> Result<Vec<u8>>;
-
- /// Retrieve all key/value pairs in given keyspace matching a given key
prefix.
- async fn get_from_prefix(
- &self,
- keyspace: Keyspace,
- prefix: &str,
- ) -> Result<Vec<(String, Vec<u8>)>>;
-
- /// Retrieve all key/value pairs in a given keyspace. If a limit is
specified, will return at
- /// most `limit` key-value pairs.
- async fn scan(
- &self,
- keyspace: Keyspace,
- limit: Option<usize>,
- ) -> Result<Vec<(String, Vec<u8>)>>;
-
- /// Retrieve all keys from a given keyspace (without their values). The
implementations
- /// should handle stripping any prefixes it may add.
- async fn scan_keys(&self, keyspace: Keyspace) -> Result<HashSet<String>>;
-
- /// Saves the value into the provided key, overriding any previous data
that might have been associated to that key.
- async fn put(&self, keyspace: Keyspace, key: String, value: Vec<u8>) ->
Result<()>;
-
- /// Bundle multiple operation in a single transaction. Either all values
should be saved, or all should fail.
- /// It can support multiple types of operations and keyspaces. If the
count of the unique keyspace is more than one,
- /// more than one locks has to be acquired.
- async fn apply_txn(&self, ops: Vec<(Operation, Keyspace, String)>) ->
Result<()>;
- /// Acquire mutex with specified IDs.
- async fn acquire_locks(
- &self,
- mut ids: Vec<(Keyspace, &str)>,
- ) -> Result<Vec<Box<dyn Lock>>> {
- // We always acquire locks in a specific order to avoid deadlocks.
- ids.sort_by_key(|n| format!("/{:?}/{}", n.0, n.1));
- future::try_join_all(ids.into_iter().map(|(ks, key)| self.lock(ks,
key))).await
- }
-
- /// Atomically move the given key from one keyspace to another
- async fn mv(
- &self,
- from_keyspace: Keyspace,
- to_keyspace: Keyspace,
- key: &str,
- ) -> Result<()>;
-
- /// Acquire mutex with specified ID.
- async fn lock(&self, keyspace: Keyspace, key: &str) -> Result<Box<dyn
Lock>>;
-
- /// Watch all events that happen on a specific prefix.
- async fn watch(
- &self,
- keyspace: Keyspace,
- prefix: String,
- ) -> Result<Box<dyn Watch<Item = WatchEvent>>>;
-
- /// Permanently delete a key from state
- async fn delete(&self, keyspace: Keyspace, key: &str) -> Result<()>;
-}
-
-/// A Watch is a cancelable stream of put or delete events in the
[StateBackendClient]
-#[async_trait]
-pub trait Watch: Stream<Item = WatchEvent> + Send + Unpin {
- async fn cancel(&mut self) -> Result<()>;
-}
-
-#[derive(Clone, Debug, Eq, PartialEq)]
-pub enum WatchEvent {
- /// Contains the inserted or updated key and the new value
- Put(String, Vec<u8>),
-
- /// Contains the deleted key
- Delete(String),
-}
-
-#[async_trait]
-pub trait Lock: Send + Sync {
- async fn unlock(&mut self);
-}
-
-#[async_trait]
-impl<T: Send + Sync> Lock for OwnedMutexGuard<T> {
- async fn unlock(&mut self) {}
-}
diff --git a/ballista/scheduler/src/cluster/storage/sled.rs
b/ballista/scheduler/src/cluster/storage/sled.rs
deleted file mode 100644
index 6be4c504..00000000
--- a/ballista/scheduler/src/cluster/storage/sled.rs
+++ /dev/null
@@ -1,395 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::collections::{HashMap, HashSet};
-use std::{sync::Arc, task::Poll};
-
-use ballista_core::error::{ballista_error, BallistaError, Result};
-
-use crate::cluster::storage::KeyValueStore;
-use async_trait::async_trait;
-use futures::{FutureExt, Stream};
-use log::warn;
-use sled_package as sled;
-use tokio::sync::Mutex;
-
-use crate::cluster::storage::{Keyspace, Lock, Operation, Watch, WatchEvent};
-
-/// A [`StateBackendClient`] implementation that uses file-based storage to
save cluster state.
-#[derive(Clone)]
-pub struct SledClient {
- db: sled::Db,
- locks: Arc<Mutex<HashMap<String, Arc<Mutex<()>>>>>,
-}
-
-impl SledClient {
- /// Creates a SledClient that saves data to the specified file.
- pub fn try_new<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
- Ok(Self {
- db: sled::open(path).map_err(sled_to_ballista_error)?,
- locks: Arc::new(Mutex::new(HashMap::new())),
- })
- }
-
- /// Creates a SledClient that saves data to a temp file.
- pub fn try_new_temporary() -> Result<Self> {
- Ok(Self {
- db: sled::Config::new()
- .temporary(true)
- .open()
- .map_err(sled_to_ballista_error)?,
- locks: Arc::new(Mutex::new(HashMap::new())),
- })
- }
-}
-
-fn sled_to_ballista_error(e: sled::Error) -> BallistaError {
- match e {
- sled::Error::Io(io) => BallistaError::IoError(io),
- _ => BallistaError::General(format!("{e}")),
- }
-}
-
-#[async_trait]
-impl KeyValueStore for SledClient {
- async fn get(&self, keyspace: Keyspace, key: &str) -> Result<Vec<u8>> {
- let key = format!("/{keyspace:?}/{key}");
- Ok(self
- .db
- .get(key)
- .map_err(|e| ballista_error(&format!("sled error {e:?}")))?
- .map(|v| v.to_vec())
- .unwrap_or_default())
- }
-
- async fn get_from_prefix(
- &self,
- keyspace: Keyspace,
- prefix: &str,
- ) -> Result<Vec<(String, Vec<u8>)>> {
- let prefix = format!("/{keyspace:?}/{prefix}");
- Ok(self
- .db
- .scan_prefix(prefix)
- .map(|v| {
- v.map(|(key, value)| {
- (
- std::str::from_utf8(&key).unwrap().to_owned(),
- value.to_vec(),
- )
- })
- })
- .collect::<std::result::Result<Vec<_>, _>>()
- .map_err(|e| ballista_error(&format!("sled error {e:?}")))?)
- }
-
- async fn scan(
- &self,
- keyspace: Keyspace,
- limit: Option<usize>,
- ) -> Result<Vec<(String, Vec<u8>)>> {
- let prefix = format!("/{keyspace:?}/");
- if let Some(limit) = limit {
- Ok(self
- .db
- .scan_prefix(prefix)
- .take(limit)
- .map(|v| {
- v.map(|(key, value)| {
- (
- std::str::from_utf8(&key).unwrap().to_owned(),
- value.to_vec(),
- )
- })
- })
- .collect::<std::result::Result<Vec<_>, _>>()
- .map_err(|e| ballista_error(&format!("sled error {e:?}")))?)
- } else {
- Ok(self
- .db
- .scan_prefix(prefix)
- .map(|v| {
- v.map(|(key, value)| {
- (
- std::str::from_utf8(&key).unwrap().to_owned(),
- value.to_vec(),
- )
- })
- })
- .collect::<std::result::Result<Vec<_>, _>>()
- .map_err(|e| ballista_error(&format!("sled error {e:?}")))?)
- }
- }
-
- async fn scan_keys(&self, keyspace: Keyspace) -> Result<HashSet<String>> {
- let prefix = format!("/{keyspace:?}/");
- Ok(self
- .db
- .scan_prefix(prefix.clone())
- .map(|v| {
- v.map(|(key, _value)| {
- std::str::from_utf8(&key)
- .unwrap()
- .strip_prefix(&prefix)
- .unwrap()
- .to_owned()
- })
- })
- .collect::<std::result::Result<HashSet<_>, _>>()
- .map_err(|e| ballista_error(&format!("sled error {e:?}")))?)
- }
-
- async fn put(&self, keyspace: Keyspace, key: String, value: Vec<u8>) ->
Result<()> {
- let key = format!("/{keyspace:?}/{key}");
- self.db
- .insert(key, value)
- .map_err(|e| {
- warn!("sled insert failed: {}", e);
- ballista_error("sled insert failed")
- })
- .map(|_| ())
- }
-
- async fn apply_txn(&self, ops: Vec<(Operation, Keyspace, String)>) ->
Result<()> {
- let mut batch = sled::Batch::default();
-
- for (op, keyspace, key_str) in ops {
- let key = format!("/{:?}/{}", &keyspace, key_str);
- match op {
- Operation::Put(value) => batch.insert(key.as_str(), value),
- Operation::Delete => batch.remove(key.as_str()),
- }
- }
-
- self.db.apply_batch(batch).map_err(|e| {
- warn!("sled transaction insert failed: {}", e);
- ballista_error("sled operations failed")
- })
- }
-
- async fn mv(
- &self,
- from_keyspace: Keyspace,
- to_keyspace: Keyspace,
- key: &str,
- ) -> Result<()> {
- let from_key = format!("/{from_keyspace:?}/{key}");
- let to_key = format!("/{to_keyspace:?}/{key}");
-
- let current_value = self
- .db
- .get(from_key.as_str())
- .map_err(|e| ballista_error(&format!("sled error {e:?}")))?
- .map(|v| v.to_vec());
-
- if let Some(value) = current_value {
- let mut batch = sled::Batch::default();
-
- batch.remove(from_key.as_str());
- batch.insert(to_key.as_str(), value);
-
- self.db.apply_batch(batch).map_err(|e| {
- warn!("sled transaction insert failed: {}", e);
- ballista_error("sled insert failed")
- })
- } else {
- // TODO should this return an error?
- warn!("Cannot move value at {}, does not exist", from_key);
- Ok(())
- }
- }
-
- async fn lock(&self, keyspace: Keyspace, key: &str) -> Result<Box<dyn
Lock>> {
- let mut mlock = self.locks.lock().await;
- let lock_key = format!("/{keyspace:?}/{key}");
- if let Some(lock) = mlock.get(&lock_key) {
- Ok(Box::new(lock.clone().lock_owned().await))
- } else {
- let new_lock = Arc::new(Mutex::new(()));
- mlock.insert(lock_key, new_lock.clone());
- Ok(Box::new(new_lock.lock_owned().await))
- }
- }
-
- async fn watch(&self, keyspace: Keyspace, prefix: String) ->
Result<Box<dyn Watch>> {
- let prefix = format!("/{keyspace:?}/{prefix}");
-
- Ok(Box::new(SledWatch {
- subscriber: self.db.watch_prefix(prefix),
- }))
- }
-
- async fn delete(&self, keyspace: Keyspace, key: &str) -> Result<()> {
- let key = format!("/{keyspace:?}/{key}");
- self.db.remove(key).map_err(|e| {
- warn!("sled delete failed: {:?}", e);
- ballista_error("sled delete failed")
- })?;
- Ok(())
- }
-}
-
-struct SledWatch {
- subscriber: sled::Subscriber,
-}
-
-#[tonic::async_trait]
-impl Watch for SledWatch {
- async fn cancel(&mut self) -> Result<()> {
- Ok(())
- }
-}
-
-impl Stream for SledWatch {
- type Item = WatchEvent;
-
- fn poll_next(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- match self.get_mut().subscriber.poll_unpin(cx) {
- Poll::Pending => Poll::Pending,
- Poll::Ready(None) => Poll::Ready(None),
- Poll::Ready(Some(sled::Event::Insert { key, value })) => {
- let key = std::str::from_utf8(&key).unwrap().to_owned();
- Poll::Ready(Some(WatchEvent::Put(key, value.to_vec())))
- }
- Poll::Ready(Some(sled::Event::Remove { key })) => {
- let key = std::str::from_utf8(&key).unwrap().to_owned();
- Poll::Ready(Some(WatchEvent::Delete(key)))
- }
- }
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.subscriber.size_hint()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::{KeyValueStore, SledClient, Watch, WatchEvent};
-
- use crate::cluster::storage::{Keyspace, Operation};
-
- use futures::StreamExt;
- use std::result::Result;
-
- fn create_instance() -> Result<SledClient, Box<dyn std::error::Error>> {
- Ok(SledClient::try_new_temporary()?)
- }
-
- #[tokio::test]
- async fn put_read() -> Result<(), Box<dyn std::error::Error>> {
- let client = create_instance()?;
- let key = "key";
- let value = "value".as_bytes();
- client
- .put(Keyspace::Slots, key.to_owned(), value.to_vec())
- .await?;
- assert_eq!(client.get(Keyspace::Slots, key).await?, value);
- Ok(())
- }
-
- #[tokio::test]
- async fn multiple_operation() -> Result<(), Box<dyn std::error::Error>> {
- let client = create_instance()?;
- let key = "key".to_string();
- let value = "value".as_bytes().to_vec();
- {
- let _locks = client
- .acquire_locks(vec![(Keyspace::JobStatus, ""),
(Keyspace::Slots, "")])
- .await?;
-
- let txn_ops = vec![
- (Operation::Put(value.clone()), Keyspace::Slots, key.clone()),
- (
- Operation::Put(value.clone()),
- Keyspace::JobStatus,
- key.clone(),
- ),
- ];
- client.apply_txn(txn_ops).await?;
- }
-
- assert_eq!(client.get(Keyspace::Slots, key.as_str()).await?, value);
- assert_eq!(client.get(Keyspace::JobStatus, key.as_str()).await?,
value);
- Ok(())
- }
-
- #[tokio::test]
- async fn read_empty() -> Result<(), Box<dyn std::error::Error>> {
- let client = create_instance()?;
- let key = "key";
- let empty: &[u8] = &[];
- assert_eq!(client.get(Keyspace::Slots, key).await?, empty);
- Ok(())
- }
-
- #[tokio::test]
- async fn read_prefix() -> Result<(), Box<dyn std::error::Error>> {
- let client = create_instance()?;
- let key = "key";
- let value = "value".as_bytes();
- client
- .put(Keyspace::Slots, format!("{key}/1"), value.to_vec())
- .await?;
- client
- .put(Keyspace::Slots, format!("{key}/2"), value.to_vec())
- .await?;
- assert_eq!(
- client.get_from_prefix(Keyspace::Slots, key).await?,
- vec![
- ("/Slots/key/1".to_owned(), value.to_vec()),
- ("/Slots/key/2".to_owned(), value.to_vec())
- ]
- );
- Ok(())
- }
-
- #[tokio::test]
- async fn read_watch() -> Result<(), Box<dyn std::error::Error>> {
- let client = create_instance()?;
- let key = "key";
- let value = "value".as_bytes();
- let mut watch: Box<dyn Watch<Item = WatchEvent>> =
- client.watch(Keyspace::Slots, key.to_owned()).await?;
- client
- .put(Keyspace::Slots, key.to_owned(), value.to_vec())
- .await?;
- assert_eq!(
- watch.next().await,
- Some(WatchEvent::Put(
- format!("/{:?}/{}", Keyspace::Slots, key.to_owned()),
- value.to_owned()
- ))
- );
- let value2 = "value2".as_bytes();
- client
- .put(Keyspace::Slots, key.to_owned(), value2.to_vec())
- .await?;
- assert_eq!(
- watch.next().await,
- Some(WatchEvent::Put(
- format!("/{:?}/{}", Keyspace::Slots, key.to_owned()),
- value2.to_owned()
- ))
- );
- watch.cancel().await?;
- Ok(())
- }
-}
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 82280911..ce542e51 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -180,10 +180,6 @@ impl SchedulerConfig {
#[derive(Clone, Debug)]
pub enum ClusterStorageConfig {
Memory,
- #[cfg(feature = "etcd")]
- Etcd(Vec<String>),
- #[cfg(feature = "sled")]
- Sled(Option<String>),
}
/// Policy of distributing tasks to available executor slots
diff --git a/ballista/scheduler/src/lib.rs b/ballista/scheduler/src/lib.rs
index cd6f047b..946e475f 100644
--- a/ballista/scheduler/src/lib.rs
+++ b/ballista/scheduler/src/lib.rs
@@ -25,7 +25,6 @@ pub mod metrics;
pub mod planner;
pub mod scheduler_process;
pub mod scheduler_server;
-#[cfg(feature = "sled")]
pub mod standalone;
pub mod state;
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 6992bf75..0d7d5e36 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -643,7 +643,7 @@ fn extract_connect_info<T>(request: &Request<T>) ->
Option<ConnectInfo<SocketAdd
.cloned()
}
-#[cfg(all(test, feature = "sled"))]
+#[cfg(test)]
mod test {
use std::sync::Arc;
use std::time::Duration;
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs
b/ballista/scheduler/src/scheduler_server/mod.rs
index c2bf657b..e6e6739e 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -337,7 +337,7 @@ pub fn timestamp_millis() -> u64 {
.as_millis() as u64
}
-#[cfg(all(test, feature = "sled"))]
+#[cfg(test)]
mod test {
use std::sync::Arc;
diff --git a/ballista/scheduler/src/standalone.rs
b/ballista/scheduler/src/standalone.rs
index cb74b2bf..bb6d7006 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -18,7 +18,7 @@
use crate::cluster::BallistaCluster;
use crate::config::SchedulerConfig;
use crate::metrics::default_metrics_collector;
-use crate::{cluster::storage::sled::SledClient,
scheduler_server::SchedulerServer};
+use crate::scheduler_server::SchedulerServer;
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::{create_grpc_server, default_session_builder};
use ballista_core::{
@@ -35,12 +35,7 @@ use tokio::net::TcpListener;
pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
let metrics_collector = default_metrics_collector()?;
- let cluster = BallistaCluster::new_kv(
- SledClient::try_new_temporary()?,
- "localhost:50050",
- default_session_builder,
- BallistaCodec::default(),
- );
+ let cluster = BallistaCluster::new_memory("localhost:50050",
default_session_builder);
let mut scheduler_server: SchedulerServer<LogicalPlanNode,
PhysicalPlanNode> =
SchedulerServer::new(
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index 333545d3..e72e3906 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -24,25 +24,20 @@ use std::time::{SystemTime, UNIX_EPOCH};
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{accept, ExecutionPlan, ExecutionPlanVisitor};
-use datafusion::prelude::SessionContext;
-use datafusion_proto::logical_plan::AsLogicalPlan;
use log::{error, info, warn};
use ballista_core::error::{BallistaError, Result};
use ballista_core::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
use ballista_core::serde::protobuf::failed_task::FailedReason;
use ballista_core::serde::protobuf::job_status::Status;
-use ballista_core::serde::protobuf::{
- self, execution_graph_stage::StageType, FailedTask, JobStatus, ResultLost,
- RunningJob, SuccessfulJob, TaskStatus,
-};
use ballista_core::serde::protobuf::{job_status, FailedJob,
ShuffleWritePartition};
use ballista_core::serde::protobuf::{task_status, RunningTask};
+use ballista_core::serde::protobuf::{
+ FailedTask, JobStatus, ResultLost, RunningJob, SuccessfulJob, TaskStatus,
+};
use ballista_core::serde::scheduler::{
ExecutorMetadata, PartitionId, PartitionLocation, PartitionStats,
};
-use ballista_core::serde::BallistaCodec;
-use datafusion_proto::physical_plan::AsExecutionPlan;
use crate::display::print_stage_metrics;
use crate::planner::DistributedPlanner;
@@ -50,8 +45,7 @@ use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::timestamp_millis;
use crate::state::execution_graph::execution_stage::RunningStage;
pub(crate) use crate::state::execution_graph::execution_stage::{
- ExecutionStage, FailedStage, ResolvedStage, StageOutput, SuccessfulStage,
TaskInfo,
- UnresolvedStage,
+ ExecutionStage, ResolvedStage, StageOutput, TaskInfo, UnresolvedStage,
};
use crate::state::task_manager::UpdatedStages;
@@ -103,6 +97,7 @@ mod execution_stage;
#[derive(Clone)]
pub struct ExecutionGraph {
/// Curator scheduler name. Can be `None` is `ExecutionGraph` is not
currently curated by any scheduler
+ #[allow(dead_code)] // not used at the moment, will be used later
scheduler_id: Option<String>,
/// ID for this job
job_id: String,
@@ -121,6 +116,7 @@ pub struct ExecutionGraph {
/// Map from Stage ID -> ExecutionStage
stages: HashMap<usize, ExecutionStage>,
/// Total number fo output partitions
+ #[allow(dead_code)] // not used at the moment, will be used later
output_partitions: usize,
/// Locations of this `ExecutionGraph` final output locations
output_locations: Vec<PartitionLocation>,
@@ -1317,163 +1313,6 @@ impl ExecutionGraph {
fn clear_stage_failure(&mut self, stage_id: usize) {
self.failed_stage_attempts.remove(&stage_id);
}
-
- pub(crate) async fn decode_execution_graph<
- T: 'static + AsLogicalPlan,
- U: 'static + AsExecutionPlan,
- >(
- proto: protobuf::ExecutionGraph,
- codec: &BallistaCodec<T, U>,
- session_ctx: &SessionContext,
- ) -> Result<ExecutionGraph> {
- let mut stages: HashMap<usize, ExecutionStage> = HashMap::new();
- for graph_stage in proto.stages {
- let stage_type = graph_stage.stage_type.expect("Unexpected empty
stage");
-
- let execution_stage = match stage_type {
- StageType::UnresolvedStage(stage) => {
- let stage: UnresolvedStage =
- UnresolvedStage::decode(stage, codec, session_ctx)?;
- (stage.stage_id, ExecutionStage::UnResolved(stage))
- }
- StageType::ResolvedStage(stage) => {
- let stage: ResolvedStage =
- ResolvedStage::decode(stage, codec, session_ctx)?;
- (stage.stage_id, ExecutionStage::Resolved(stage))
- }
- StageType::SuccessfulStage(stage) => {
- let stage: SuccessfulStage =
- SuccessfulStage::decode(stage, codec, session_ctx)?;
- (stage.stage_id, ExecutionStage::Successful(stage))
- }
- StageType::FailedStage(stage) => {
- let stage: FailedStage =
- FailedStage::decode(stage, codec, session_ctx)?;
- (stage.stage_id, ExecutionStage::Failed(stage))
- }
- };
-
- stages.insert(execution_stage.0, execution_stage.1);
- }
-
- let output_locations: Vec<PartitionLocation> = proto
- .output_locations
- .into_iter()
- .map(|loc| loc.try_into())
- .collect::<Result<Vec<_>>>()?;
-
- let failed_stage_attempts = proto
- .failed_attempts
- .into_iter()
- .map(|attempt| {
- (
- attempt.stage_id as usize,
- HashSet::from_iter(
- attempt
- .stage_attempt_num
- .into_iter()
- .map(|num| num as usize),
- ),
- )
- })
- .collect();
-
- Ok(ExecutionGraph {
- scheduler_id:
(!proto.scheduler_id.is_empty()).then_some(proto.scheduler_id),
- job_id: proto.job_id,
- job_name: proto.job_name,
- session_id: proto.session_id,
- status: proto.status.ok_or_else(|| {
- BallistaError::Internal(
- "Invalid Execution Graph: missing job status".to_owned(),
- )
- })?,
- queued_at: proto.queued_at,
- start_time: proto.start_time,
- end_time: proto.end_time,
- stages,
- output_partitions: proto.output_partitions as usize,
- output_locations,
- task_id_gen: proto.task_id_gen as usize,
- failed_stage_attempts,
- })
- }
-
- /// Running stages will not be persisted so that will not be encoded.
- /// Running stages will be convert back to the resolved stages to be
encoded and persisted
- pub(crate) fn encode_execution_graph<
- T: 'static + AsLogicalPlan,
- U: 'static + AsExecutionPlan,
- >(
- graph: ExecutionGraph,
- codec: &BallistaCodec<T, U>,
- ) -> Result<protobuf::ExecutionGraph> {
- let job_id = graph.job_id().to_owned();
-
- let stages = graph
- .stages
- .into_values()
- .map(|stage| {
- let stage_type = match stage {
- ExecutionStage::UnResolved(stage) => {
-
StageType::UnresolvedStage(UnresolvedStage::encode(stage, codec)?)
- }
- ExecutionStage::Resolved(stage) => {
- StageType::ResolvedStage(ResolvedStage::encode(stage,
codec)?)
- }
- ExecutionStage::Running(stage) => StageType::ResolvedStage(
- ResolvedStage::encode(stage.to_resolved(), codec)?,
- ),
- ExecutionStage::Successful(stage) =>
StageType::SuccessfulStage(
- SuccessfulStage::encode(job_id.clone(), stage, codec)?,
- ),
- ExecutionStage::Failed(stage) => StageType::FailedStage(
- FailedStage::encode(job_id.clone(), stage, codec)?,
- ),
- };
- Ok(protobuf::ExecutionGraphStage {
- stage_type: Some(stage_type),
- })
- })
- .collect::<Result<Vec<_>>>()?;
-
- let output_locations: Vec<protobuf::PartitionLocation> = graph
- .output_locations
- .into_iter()
- .map(|loc| loc.try_into())
- .collect::<Result<Vec<_>>>()?;
-
- let failed_attempts: Vec<protobuf::StageAttempts> = graph
- .failed_stage_attempts
- .into_iter()
- .map(|(stage_id, attempts)| {
- let stage_attempt_num = attempts
- .into_iter()
- .map(|num| num as u32)
- .collect::<Vec<_>>();
- protobuf::StageAttempts {
- stage_id: stage_id as u32,
- stage_attempt_num,
- }
- })
- .collect::<Vec<_>>();
-
- Ok(protobuf::ExecutionGraph {
- job_id: graph.job_id,
- job_name: graph.job_name,
- session_id: graph.session_id,
- status: Some(graph.status),
- queued_at: graph.queued_at,
- start_time: graph.start_time,
- end_time: graph.end_time,
- stages,
- output_partitions: graph.output_partitions as u64,
- output_locations,
- scheduler_id: graph.scheduler_id.unwrap_or_default(),
- task_id_gen: graph.task_id_gen as u32,
- failed_attempts,
- })
- }
}
impl Debug for ExecutionGraph {
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index 65a2d012..d919167b 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -18,7 +18,6 @@
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::fmt::{Debug, Formatter};
-use std::iter::FromIterator;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
@@ -27,21 +26,17 @@ use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
use datafusion::physical_plan::{ExecutionPlan, Metric};
-use datafusion::prelude::{SessionConfig, SessionContext};
-use datafusion_proto::logical_plan::AsLogicalPlan;
+use datafusion::prelude::SessionConfig;
use log::{debug, warn};
use ballista_core::error::{BallistaError, Result};
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf::failed_task::FailedReason;
+use ballista_core::serde::protobuf::{task_status, RunningTask};
use ballista_core::serde::protobuf::{
- self, task_info, FailedTask, GraphStageInput, OperatorMetricsSet,
ResultLost,
- SuccessfulTask, TaskStatus,
+ FailedTask, OperatorMetricsSet, ResultLost, SuccessfulTask, TaskStatus,
};
-use ballista_core::serde::protobuf::{task_status, RunningTask};
use ballista_core::serde::scheduler::PartitionLocation;
-use ballista_core::serde::BallistaCodec;
-use datafusion_proto::physical_plan::AsExecutionPlan;
use crate::display::DisplayableBallistaExecutionPlan;
@@ -207,6 +202,7 @@ pub(crate) struct FailedStage {
pub(crate) partitions: usize,
/// Stage ID of the stage that will take this stages outputs as inputs.
/// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
+ #[allow(dead_code)] // not used at the moment, will be used later
pub(crate) output_links: Vec<usize>,
/// `ExecutionPlan` for this stage
pub(crate) plan: Arc<dyn ExecutionPlan>,
@@ -214,12 +210,14 @@ pub(crate) struct FailedStage {
/// The index of the Vec is the task's partition id
pub(crate) task_infos: Vec<Option<TaskInfo>>,
/// Combined metrics of the already finished tasks in the stage, If it is
None, no task is finished yet.
+ #[allow(dead_code)] // not used at the moment, will be used later
pub(crate) stage_metrics: Option<Vec<MetricsSet>>,
/// Error message
pub(crate) error_message: String,
}
#[derive(Clone)]
+#[allow(dead_code)] // we may use the fields later
pub(crate) struct TaskInfo {
/// Task ID
pub(super) task_id: usize,
@@ -368,54 +366,6 @@ impl UnresolvedStage {
self.last_attempt_failure_reasons.clone(),
))
}
-
- pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
- stage: protobuf::UnResolvedStage,
- codec: &BallistaCodec<T, U>,
- session_ctx: &SessionContext,
- ) -> Result<UnresolvedStage> {
- let plan_proto = U::try_decode(&stage.plan)?;
- let plan = plan_proto.try_into_physical_plan(
- session_ctx,
- session_ctx.runtime_env().as_ref(),
- codec.physical_extension_codec(),
- )?;
-
- let inputs = decode_inputs(stage.inputs)?;
-
- Ok(UnresolvedStage {
- stage_id: stage.stage_id as usize,
- stage_attempt_num: stage.stage_attempt_num as usize,
- output_links: stage.output_links.into_iter().map(|l| l as
usize).collect(),
- plan,
- inputs,
- last_attempt_failure_reasons: HashSet::from_iter(
- stage.last_attempt_failure_reasons,
- ),
- })
- }
-
- pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
- stage: UnresolvedStage,
- codec: &BallistaCodec<T, U>,
- ) -> Result<protobuf::UnResolvedStage> {
- let mut plan: Vec<u8> = vec![];
- U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
- .and_then(|proto| proto.try_encode(&mut plan))?;
-
- let inputs = encode_inputs(stage.inputs)?;
-
- Ok(protobuf::UnResolvedStage {
- stage_id: stage.stage_id as u32,
- stage_attempt_num: stage.stage_attempt_num as u32,
- output_links: stage.output_links.into_iter().map(|l| l as
u32).collect(),
- inputs,
- plan,
- last_attempt_failure_reasons: Vec::from_iter(
- stage.last_attempt_failure_reasons,
- ),
- })
- }
}
impl Debug for UnresolvedStage {
@@ -482,56 +432,6 @@ impl ResolvedStage {
);
Ok(unresolved)
}
-
- pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
- stage: protobuf::ResolvedStage,
- codec: &BallistaCodec<T, U>,
- session_ctx: &SessionContext,
- ) -> Result<ResolvedStage> {
- let plan_proto = U::try_decode(&stage.plan)?;
- let plan = plan_proto.try_into_physical_plan(
- session_ctx,
- session_ctx.runtime_env().as_ref(),
- codec.physical_extension_codec(),
- )?;
-
- let inputs = decode_inputs(stage.inputs)?;
-
- Ok(ResolvedStage {
- stage_id: stage.stage_id as usize,
- stage_attempt_num: stage.stage_attempt_num as usize,
- partitions: stage.partitions as usize,
- output_links: stage.output_links.into_iter().map(|l| l as
usize).collect(),
- inputs,
- plan,
- last_attempt_failure_reasons: HashSet::from_iter(
- stage.last_attempt_failure_reasons,
- ),
- })
- }
-
- pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
- stage: ResolvedStage,
- codec: &BallistaCodec<T, U>,
- ) -> Result<protobuf::ResolvedStage> {
- let mut plan: Vec<u8> = vec![];
- U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
- .and_then(|proto| proto.try_encode(&mut plan))?;
-
- let inputs = encode_inputs(stage.inputs)?;
-
- Ok(protobuf::ResolvedStage {
- stage_id: stage.stage_id as u32,
- stage_attempt_num: stage.stage_attempt_num as u32,
- partitions: stage.partitions as u32,
- output_links: stage.output_links.into_iter().map(|l| l as
u32).collect(),
- inputs,
- plan,
- last_attempt_failure_reasons: Vec::from_iter(
- stage.last_attempt_failure_reasons,
- ),
- })
- }
}
impl Debug for ResolvedStage {
@@ -611,18 +511,6 @@ impl RunningStage {
}
}
- /// Change to the resolved state and bump the stage attempt number
- pub(super) fn to_resolved(&self) -> ResolvedStage {
- ResolvedStage::new(
- self.stage_id,
- self.stage_attempt_num + 1,
- self.plan.clone(),
- self.output_links.clone(),
- self.inputs.clone(),
- HashSet::new(),
- )
- }
-
/// Change to the unresolved state and bump the stage attempt number
pub(super) fn to_unresolved(
&self,
@@ -952,80 +840,6 @@ impl SuccessfulStage {
}
reset
}
-
- pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
- stage: protobuf::SuccessfulStage,
- codec: &BallistaCodec<T, U>,
- session_ctx: &SessionContext,
- ) -> Result<SuccessfulStage> {
- let plan_proto = U::try_decode(&stage.plan)?;
- let plan = plan_proto.try_into_physical_plan(
- session_ctx,
- session_ctx.runtime_env().as_ref(),
- codec.physical_extension_codec(),
- )?;
-
- let inputs = decode_inputs(stage.inputs)?;
- assert_eq!(
- stage.task_infos.len(),
- stage.partitions as usize,
- "protobuf::SuccessfulStage task_infos len not equal to partitions."
- );
- let task_infos =
stage.task_infos.into_iter().map(decode_taskinfo).collect();
- let stage_metrics = stage
- .stage_metrics
- .into_iter()
- .map(|m| m.try_into())
- .collect::<Result<Vec<_>>>()?;
-
- Ok(SuccessfulStage {
- stage_id: stage.stage_id as usize,
- stage_attempt_num: stage.stage_attempt_num as usize,
- partitions: stage.partitions as usize,
- output_links: stage.output_links.into_iter().map(|l| l as
usize).collect(),
- inputs,
- plan,
- task_infos,
- stage_metrics,
- })
- }
-
- pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
- _job_id: String,
- stage: SuccessfulStage,
- codec: &BallistaCodec<T, U>,
- ) -> Result<protobuf::SuccessfulStage> {
- let stage_id = stage.stage_id;
-
- let mut plan: Vec<u8> = vec![];
- U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
- .and_then(|proto| proto.try_encode(&mut plan))?;
-
- let inputs = encode_inputs(stage.inputs)?;
- let task_infos = stage
- .task_infos
- .into_iter()
- .enumerate()
- .map(|(partition, task_info)| encode_taskinfo(task_info,
partition))
- .collect();
-
- let stage_metrics = stage
- .stage_metrics
- .into_iter()
- .map(|m| m.try_into())
- .collect::<Result<Vec<_>>>()?;
-
- Ok(protobuf::SuccessfulStage {
- stage_id: stage_id as u32,
- stage_attempt_num: stage.stage_attempt_num as u32,
- partitions: stage.partitions as u32,
- output_links: stage.output_links.into_iter().map(|l| l as
u32).collect(),
- inputs,
- plan,
- task_infos,
- stage_metrics,
- })
- }
}
impl Debug for SuccessfulStage {
@@ -1071,85 +885,6 @@ impl FailedStage {
pub(super) fn available_tasks(&self) -> usize {
self.task_infos.iter().filter(|s| s.is_none()).count()
}
-
- pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
- stage: protobuf::FailedStage,
- codec: &BallistaCodec<T, U>,
- session_ctx: &SessionContext,
- ) -> Result<FailedStage> {
- let plan_proto = U::try_decode(&stage.plan)?;
- let plan = plan_proto.try_into_physical_plan(
- session_ctx,
- session_ctx.runtime_env().as_ref(),
- codec.physical_extension_codec(),
- )?;
-
- let mut task_infos: Vec<Option<TaskInfo>> = vec![None;
stage.partitions as usize];
- for info in stage.task_infos {
- task_infos[info.partition_id as usize] =
Some(decode_taskinfo(info.clone()));
- }
-
- let stage_metrics = if stage.stage_metrics.is_empty() {
- None
- } else {
- let ms = stage
- .stage_metrics
- .into_iter()
- .map(|m| m.try_into())
- .collect::<Result<Vec<_>>>()?;
- Some(ms)
- };
-
- Ok(FailedStage {
- stage_id: stage.stage_id as usize,
- stage_attempt_num: stage.stage_attempt_num as usize,
- partitions: stage.partitions as usize,
- output_links: stage.output_links.into_iter().map(|l| l as
usize).collect(),
- plan,
- task_infos,
- stage_metrics,
- error_message: stage.error_message,
- })
- }
-
- pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
- _job_id: String,
- stage: FailedStage,
- codec: &BallistaCodec<T, U>,
- ) -> Result<protobuf::FailedStage> {
- let stage_id = stage.stage_id;
-
- let mut plan: Vec<u8> = vec![];
- U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
- .and_then(|proto| proto.try_encode(&mut plan))?;
-
- let task_infos: Vec<protobuf::TaskInfo> = stage
- .task_infos
- .into_iter()
- .enumerate()
- .filter_map(|(partition, task_info)| {
- task_info.map(|info| encode_taskinfo(info, partition))
- })
- .collect();
-
- let stage_metrics = stage
- .stage_metrics
- .unwrap_or_default()
- .into_iter()
- .map(|m| m.try_into())
- .collect::<Result<Vec<_>>>()?;
-
- Ok(protobuf::FailedStage {
- stage_id: stage_id as u32,
- stage_attempt_num: stage.stage_attempt_num as u32,
- partitions: stage.partitions as u32,
- output_links: stage.output_links.into_iter().map(|l| l as
u32).collect(),
- plan,
- task_infos,
- stage_metrics,
- error_message: stage.error_message,
- })
- }
}
impl Debug for FailedStage {
@@ -1220,106 +955,3 @@ impl StageOutput {
self.complete
}
}
-
-fn decode_inputs(
- stage_inputs: Vec<GraphStageInput>,
-) -> Result<HashMap<usize, StageOutput>> {
- let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
- for input in stage_inputs {
- let stage_id = input.stage_id as usize;
-
- let outputs = input
- .partition_locations
- .into_iter()
- .map(|loc| {
- let partition = loc.partition as usize;
- let locations = loc
- .partition_location
- .into_iter()
- .map(|l| l.try_into())
- .collect::<Result<Vec<_>>>()?;
- Ok((partition, locations))
- })
- .collect::<Result<HashMap<usize, Vec<PartitionLocation>>>>()?;
-
- inputs.insert(
- stage_id,
- StageOutput {
- partition_locations: outputs,
- complete: input.complete,
- },
- );
- }
- Ok(inputs)
-}
-
-fn encode_inputs(
- stage_inputs: HashMap<usize, StageOutput>,
-) -> Result<Vec<GraphStageInput>> {
- let mut inputs: Vec<protobuf::GraphStageInput> = vec![];
- for (stage_id, output) in stage_inputs.into_iter() {
- inputs.push(protobuf::GraphStageInput {
- stage_id: stage_id as u32,
- partition_locations: output
- .partition_locations
- .into_iter()
- .map(|(partition, locations)| {
- Ok(protobuf::TaskInputPartitions {
- partition: partition as u32,
- partition_location: locations
- .into_iter()
- .map(|l| l.try_into())
- .collect::<Result<Vec<_>>>()?,
- })
- })
- .collect::<Result<Vec<_>>>()?,
- complete: output.complete,
- });
- }
- Ok(inputs)
-}
-
-fn decode_taskinfo(task_info: protobuf::TaskInfo) -> TaskInfo {
- let task_info_status = match task_info.status {
- Some(task_info::Status::Running(running)) => {
- task_status::Status::Running(running)
- }
- Some(task_info::Status::Failed(failed)) =>
task_status::Status::Failed(failed),
- Some(task_info::Status::Successful(success)) => {
- task_status::Status::Successful(success)
- }
- _ => panic!(
- "protobuf::TaskInfo status for task {} should not be none",
- task_info.task_id
- ),
- };
- TaskInfo {
- task_id: task_info.task_id as usize,
- scheduled_time: task_info.scheduled_time as u128,
- launch_time: task_info.launch_time as u128,
- start_exec_time: task_info.start_exec_time as u128,
- end_exec_time: task_info.end_exec_time as u128,
- finish_time: task_info.finish_time as u128,
- task_status: task_info_status,
- }
-}
-
-fn encode_taskinfo(task_info: TaskInfo, partition_id: usize) ->
protobuf::TaskInfo {
- let task_info_status = match task_info.task_status {
- task_status::Status::Running(running) =>
task_info::Status::Running(running),
- task_status::Status::Failed(failed) =>
task_info::Status::Failed(failed),
- task_status::Status::Successful(success) => {
- task_info::Status::Successful(success)
- }
- };
- protobuf::TaskInfo {
- task_id: task_info.task_id as u32,
- partition_id: partition_id as u32,
- scheduled_time: task_info.scheduled_time as u64,
- launch_time: task_info.launch_time as u64,
- start_exec_time: task_info.start_exec_time as u64,
- end_exec_time: task_info.end_exec_time as u64,
- finish_time: task_info.finish_time as u64,
- status: Some(task_info_status),
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]