This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 97a74521d feat: add remote hash policy as not used (#1526)
97a74521d is described below
commit 97a74521de39568d695e4ccab2dfdb50a79e9feb
Author: Saj <[email protected]>
AuthorDate: Fri Mar 27 17:27:50 2026 +0000
feat: add remote hash policy as not used (#1526)
---
ballista/core/src/consistent_hash/mod.rs | 343 ---------------------
ballista/core/src/consistent_hash/node.rs | 27 --
ballista/core/src/lib.rs | 2 -
ballista/scheduler/src/cluster/memory.rs | 93 +-----
ballista/scheduler/src/cluster/mod.rs | 384 +-----------------------
ballista/scheduler/src/config.rs | 24 --
ballista/scheduler/src/scheduler_server/grpc.rs | 5 -
docs/source/user-guide/configs.md | 2 +-
8 files changed, 19 insertions(+), 861 deletions(-)
diff --git a/ballista/core/src/consistent_hash/mod.rs
b/ballista/core/src/consistent_hash/mod.rs
deleted file mode 100644
index 3be7be872..000000000
--- a/ballista/core/src/consistent_hash/mod.rs
+++ /dev/null
@@ -1,343 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Consistent hashing implementation for distributing data across nodes.
-
-use crate::consistent_hash::node::Node;
-use md5::{Digest, Md5};
-use std::collections::{BTreeMap, HashMap};
-
-/// Node trait and implementations for consistent hashing.
-pub mod node;
-
-/// Function type for computing hash values from byte slices.
-pub type HashFunction = fn(&[u8]) -> Vec<u8>;
-
-/// A consistent hash ring for distributing keys across nodes.
-///
-/// Uses virtual nodes (replicas) to ensure even distribution of keys
-/// and provides tolerance for node failures.
-pub struct ConsistentHash<N>
-where
- N: Node,
-{
- virtual_nodes: BTreeMap<Vec<u8>, String>,
- node_replicas: HashMap<String, (N, usize)>,
- hash_func: HashFunction,
-}
-
-impl<N> ConsistentHash<N>
-where
- N: Node,
-{
- /// Creates a new consistent hash ring with the given nodes and replica
counts.
- ///
- /// Uses MD5 as the default hash function.
- pub fn new(node_replicas: Vec<(N, usize)>) -> Self {
- let consistent_hash = Self {
- virtual_nodes: BTreeMap::new(),
- node_replicas: HashMap::new(),
- hash_func: md5_hash,
- };
- consistent_hash.init(node_replicas)
- }
-
- /// Creates a new consistent hash ring with a custom hash function.
- pub fn new_with_hash(
- node_replicas: Vec<(N, usize)>,
- hash_func: HashFunction,
- ) -> Self {
- let consistent_hash = Self {
- virtual_nodes: BTreeMap::new(),
- node_replicas: HashMap::new(),
- hash_func,
- };
- consistent_hash.init(node_replicas)
- }
-
- fn init(mut self, node_replicas: Vec<(N, usize)>) -> Self {
- node_replicas.into_iter().for_each(|(node, num_replicas)| {
- self.add(node, num_replicas);
- });
- self
- }
-
- /// Returns references to all nodes in the hash ring.
- pub fn nodes(&self) -> Vec<&N> {
- self.node_replicas
- .values()
- .map(|(node, _)| node)
- .collect::<Vec<_>>()
- }
-
- /// Returns mutable references to all nodes in the hash ring.
- pub fn nodes_mut(&mut self) -> Vec<&mut N> {
- self.node_replicas
- .values_mut()
- .map(|(node, _)| node)
- .collect::<Vec<_>>()
- }
-
- /// Adds a node to the hash ring with the specified number of virtual
replicas.
- pub fn add(&mut self, node: N, num_replicas: usize) {
- // Remove existing ones
- self.remove(node.name());
-
- for i in 0..num_replicas {
- let vnode_id = format!("{}:{i}", node.name());
- let vnode_key = (self.hash_func)(vnode_id.as_bytes());
- self.virtual_nodes
- .insert(vnode_key, node.name().to_string());
- }
- self.node_replicas
- .insert(node.name().to_string(), (node, num_replicas));
- }
-
- /// Removes a node from the hash ring by name, returning the node and
replica count if found.
- pub fn remove(&mut self, node_name: &str) -> Option<(N, usize)> {
- if let Some((node, num_replicas)) =
self.node_replicas.remove(node_name) {
- for i in 0..num_replicas {
- let vnode_id = format!("{node_name}:{i}");
- let vnode_key = (self.hash_func)(vnode_id.as_bytes());
- self.virtual_nodes.remove(vnode_key.as_slice());
- }
- Some((node, num_replicas))
- } else {
- None
- }
- }
-
- /// Gets the node responsible for the given key.
- pub fn get(&self, key: &[u8]) -> Option<&N> {
- self.get_with_tolerance(key, 0)
- }
-
- /// Gets the node responsible for the given key, skipping up to
`tolerance` invalid nodes.
- pub fn get_with_tolerance(&self, key: &[u8], tolerance: usize) ->
Option<&N> {
- self.get_position_key(key, tolerance)
- .and_then(move |position_key| {
- self.virtual_nodes
- .get(&position_key)
- .map(|node_name|
&(self.node_replicas.get(node_name).unwrap().0))
- })
- }
-
- /// Gets a mutable reference to the node responsible for the given key.
- pub fn get_mut(&mut self, key: &[u8]) -> Option<&mut N> {
- self.get_mut_with_tolerance(key, 0)
- }
-
- /// Gets a mutable reference to the node for the given key, with failure
tolerance.
- pub fn get_mut_with_tolerance(
- &mut self,
- key: &[u8],
- tolerance: usize,
- ) -> Option<&mut N> {
- self.get_position_key(key, tolerance)
- .and_then(move |position_key| {
- if let Some(node_name) = self.virtual_nodes.get(&position_key)
{
- Some(&mut
(self.node_replicas.get_mut(node_name).unwrap().0))
- } else {
- None
- }
- })
- }
-
- fn get_position_key(&self, key: &[u8], tolerance: usize) ->
Option<Vec<u8>> {
- if self.node_replicas.is_empty() {
- return None;
- };
-
- let mut tolerance = if tolerance >= self.virtual_nodes.len() {
- self.virtual_nodes.len() - 1
- } else {
- tolerance
- };
- let hashed_key = (self.hash_func)(key);
- for (position_key, node_name) in self
- .virtual_nodes
- .range(hashed_key..)
- .chain(self.virtual_nodes.iter())
- {
- if let Some((node, _)) = self.node_replicas.get(node_name)
- && node.is_valid()
- {
- return Some(position_key.clone());
- }
- if tolerance == 0 {
- return None;
- } else {
- tolerance -= 1;
- }
- }
-
- None
- }
-}
-
-/// Computes an MD5 hash of the input data.
-pub fn md5_hash(data: &[u8]) -> Vec<u8> {
- let mut digest = Md5::default();
- digest.update(data);
- digest.finalize().to_vec()
-}
-
-#[cfg(test)]
-mod test {
- use crate::consistent_hash::ConsistentHash;
- use crate::consistent_hash::node::Node;
-
- #[test]
- fn test_topology() {
- let (mut consistent_hash, nodes, keys) = prepare_consistent_hash();
-
- // Test removal case
- let (node, num_replicas) =
consistent_hash.remove(nodes[3].name()).unwrap();
- for (i, key) in keys.iter().enumerate() {
- if i == 3 {
- assert_ne!(
- consistent_hash.get(key.as_bytes()).unwrap().name(),
- nodes[i].name()
- );
- } else {
- assert_eq!(
- consistent_hash.get(key.as_bytes()).unwrap().name(),
- nodes[i].name()
- );
- }
- }
-
- // Test adding case
- consistent_hash.add(node, num_replicas);
- for (i, key) in keys.iter().enumerate() {
- assert_eq!(
- consistent_hash.get(key.as_bytes()).unwrap().name(),
- nodes[i].name()
- );
- }
- }
-
- #[test]
- fn test_tolerance() {
- let (mut consistent_hash, nodes, keys) = prepare_consistent_hash();
- let (mut node, num_replicas) =
consistent_hash.remove(nodes[2].name()).unwrap();
- node.available = false;
- consistent_hash.add(node, num_replicas);
- for (i, key) in keys.iter().enumerate() {
- if i == 2 {
- assert!(consistent_hash.get(key.as_bytes()).is_none());
- assert!(
- consistent_hash
- .get_with_tolerance(key.as_bytes(), 1)
- .is_some()
- );
- } else {
- assert_eq!(
- consistent_hash.get(key.as_bytes()).unwrap().name(),
- nodes[i].name()
- );
- }
- }
-
- for (i, node) in nodes.iter().enumerate() {
- if i != 2 && i != 1 {
- let (mut node, num_replicas) =
- consistent_hash.remove(node.name()).unwrap();
- node.available = false;
- consistent_hash.add(node, num_replicas);
- }
- }
- for (i, key) in keys.iter().enumerate() {
- if i == 1 {
- assert_eq!(
- consistent_hash.get(key.as_bytes()).unwrap().name(),
- nodes[i].name()
- );
- } else {
- assert!(consistent_hash.get(key.as_bytes()).is_none());
- }
- assert_eq!(
- consistent_hash
- .get_with_tolerance(key.as_bytes(), usize::MAX)
- .unwrap()
- .name(),
- nodes[1].name()
- );
- }
- }
-
- #[derive(Clone)]
- struct ServerNode {
- name: String,
- available: bool,
- }
-
- impl ServerNode {
- fn new(host: &str, port: u16) -> Self {
- Self::new_with_available(host, port, true)
- }
-
- fn new_with_available(host: &str, port: u16, available: bool) -> Self {
- Self {
- name: format!("{host}:{port}"),
- available,
- }
- }
- }
-
- impl Node for ServerNode {
- fn name(&self) -> &str {
- &self.name
- }
-
- fn is_valid(&self) -> bool {
- self.available
- }
- }
-
- fn prepare_consistent_hash() -> (
- ConsistentHash<ServerNode>,
- Vec<ServerNode>,
- Vec<&'static str>,
- ) {
- let num_replicas = 20usize;
-
- let nodes = vec![
- ServerNode::new("localhost", 10000),
- ServerNode::new("localhost", 10001),
- ServerNode::new("localhost", 10002),
- ServerNode::new("localhost", 10003),
- ServerNode::new("localhost", 10004),
- ];
-
- let node_replicas = nodes
- .iter()
- .map(|node| (node.clone(), num_replicas))
- .collect::<Vec<_>>();
- let consistent_hash = ConsistentHash::new(node_replicas);
-
- let keys = vec!["1", "4", "5", "3", "2"];
- for (i, key) in keys.iter().enumerate() {
- assert_eq!(
- consistent_hash.get(key.as_bytes()).unwrap().name(),
- nodes[i].name()
- );
- }
-
- (consistent_hash, nodes, keys)
- }
-}
diff --git a/ballista/core/src/consistent_hash/node.rs
b/ballista/core/src/consistent_hash/node.rs
deleted file mode 100644
index 8e995bc09..000000000
--- a/ballista/core/src/consistent_hash/node.rs
+++ /dev/null
@@ -1,27 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Node trait for consistent hashing.
-
-/// A trait representing a node in a consistent hash ring.
-pub trait Node {
- /// Returns the unique name identifier for this node.
- fn name(&self) -> &str;
-
- /// Returns whether this node is currently valid and can receive requests.
- fn is_valid(&self) -> bool;
-}
diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs
index c9c4ef1d5..7211772f5 100644
--- a/ballista/core/src/lib.rs
+++ b/ballista/core/src/lib.rs
@@ -35,8 +35,6 @@ pub fn print_version() {
pub mod client;
/// Configuration options and settings for Ballista components.
pub mod config;
-/// Consistent hashing implementation for data distribution.
-pub mod consistent_hash;
/// Utilities for generating execution plan diagrams.
pub mod diagram;
/// Error types and result definitions for Ballista operations.
diff --git a/ballista/scheduler/src/cluster/memory.rs
b/ballista/scheduler/src/cluster/memory.rs
index f3f70e99c..721d068f9 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -17,9 +17,7 @@
use crate::cluster::{
BoundTask, ClusterState, ExecutorSlot, JobState, JobStateEvent,
JobStateEventStream,
- JobStatus, TaskDistributionPolicy, TopologyNode, bind_task_bias,
- bind_task_consistent_hash, bind_task_round_robin, get_scan_files,
- is_skip_consistent_hash,
+ JobStatus, TaskDistributionPolicy, bind_task_bias, bind_task_round_robin,
};
use crate::state::execution_graph::ExecutionGraphBox;
use async_trait::async_trait;
@@ -39,14 +37,12 @@ use crate::scheduler_server::{SessionBuilder,
timestamp_millis, timestamp_secs};
use crate::state::session_manager::create_datafusion_context;
use crate::state::task_manager::JobInfoCache;
use ballista_core::serde::protobuf::job_status::Status;
-use log::{error, info, warn};
+use log::{error, warn};
use std::collections::{HashMap, HashSet};
use std::ops::DerefMut;
-use ballista_core::consistent_hash::node::Node;
-use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;
-use tokio::sync::{Mutex, MutexGuard};
+use tokio::sync::Mutex;
use super::{ClusterStateEvent, ClusterStateEventStream};
@@ -67,44 +63,6 @@ pub struct InMemoryClusterState {
cluster_event_sender: ClusterEventSender<ClusterStateEvent>,
}
-impl InMemoryClusterState {
- /// Get the topology nodes of the cluster for consistent hashing
- fn get_topology_nodes(
- &self,
- guard: &MutexGuard<HashMap<String, AvailableTaskSlots>>,
- executors: Option<HashSet<String>>,
- ) -> HashMap<String, TopologyNode> {
- let mut nodes: HashMap<String, TopologyNode> = HashMap::new();
- for (executor_id, slots) in guard.iter() {
- if let Some(executors) = executors.as_ref()
- && !executors.contains(executor_id)
- {
- continue;
- }
- if let Some(executor) = self.executors.get(&slots.executor_id) {
- let node = TopologyNode::new(
- &executor.host,
- executor.port,
- &slots.executor_id,
- self.heartbeats
- .get(&executor.id)
- .map(|heartbeat| heartbeat.timestamp)
- .unwrap_or(0),
- slots.slots,
- );
- if let Some(existing_node) = nodes.get(node.name()) {
- if existing_node.last_seen_ts < node.last_seen_ts {
- nodes.insert(node.name().to_string(), node);
- }
- } else {
- nodes.insert(node.name().to_string(), node);
- }
- }
- }
- nodes
- }
-}
-
#[async_trait]
impl ClusterState for InMemoryClusterState {
async fn bind_schedulable_tasks(
@@ -134,51 +92,6 @@ impl ClusterState for InMemoryClusterState {
TaskDistributionPolicy::RoundRobin => {
bind_task_round_robin(available_slots, active_jobs, |_|
false).await
}
- TaskDistributionPolicy::ConsistentHash {
- num_replicas,
- tolerance,
- } => {
- let mut bound_tasks = bind_task_round_robin(
- available_slots,
- active_jobs.clone(),
- |stage_plan: Arc<dyn ExecutionPlan>| {
- if let Ok(scan_files) = get_scan_files(stage_plan) {
- // Should be opposite to consistent hash ones.
- !is_skip_consistent_hash(&scan_files)
- } else {
- false
- }
- },
- )
- .await;
- info!("{} tasks bound by round robin policy",
bound_tasks.len());
- let (bound_tasks_consistent_hash, ch_topology) =
- bind_task_consistent_hash(
- self.get_topology_nodes(&guard, executors),
- num_replicas,
- tolerance,
- active_jobs,
- |_, plan| get_scan_files(plan),
- )
- .await?;
- info!(
- "{} tasks bound by consistent hashing policy",
- bound_tasks_consistent_hash.len()
- );
- if !bound_tasks_consistent_hash.is_empty() {
- bound_tasks.extend(bound_tasks_consistent_hash);
- // Update the available slots
- let ch_topology = ch_topology.unwrap();
- for node in ch_topology.nodes() {
- if let Some(data) = guard.get_mut(&node.id) {
- data.slots = node.available_slots;
- } else {
- error!("Fail to find executor data for {}",
&node.id);
- }
- }
- }
- bound_tasks
- }
TaskDistributionPolicy::Custom(ref policy) => {
policy.bind_tasks(available_slots, active_jobs).await?
}
diff --git a/ballista/scheduler/src/cluster/mod.rs
b/ballista/scheduler/src/cluster/mod.rs
index 552faa265..58c63530f 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -15,38 +15,27 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::{HashMap, HashSet};
-use std::pin::Pin;
-use std::sync::Arc;
-
-use datafusion::common::tree_node::TreeNode;
-use datafusion::common::tree_node::TreeNodeRecursion;
-use datafusion::datasource::listing::PartitionedFile;
-use datafusion::datasource::physical_plan::FileScanConfig;
-use datafusion::datasource::source::DataSourceExec;
-use datafusion::error::DataFusionError;
-use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::{SessionConfig, SessionContext};
-use futures::Stream;
-use log::debug;
-
-use ballista_core::consistent_hash::ConsistentHash;
-use ballista_core::error::Result;
-use ballista_core::serde::protobuf::{
- AvailableTaskSlots, ExecutorHeartbeat, JobStatus, job_status,
-};
-use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata,
PartitionId};
-use ballista_core::utils::{default_config_producer, default_session_builder};
-use ballista_core::{ConfigProducer, JobStatusSubscriber, consistent_hash};
-
use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
-
use crate::config::{SchedulerConfig, TaskDistributionPolicy};
use crate::scheduler_server::SessionBuilder;
use crate::state::execution_graph::{
ExecutionGraphBox, TaskDescription, create_task_info,
};
use crate::state::task_manager::JobInfoCache;
+use ballista_core::error::Result;
+use ballista_core::serde::protobuf::{
+ AvailableTaskSlots, ExecutorHeartbeat, JobStatus, job_status,
+};
+use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata,
PartitionId};
+use ballista_core::utils::{default_config_producer, default_session_builder};
+use ballista_core::{ConfigProducer, JobStatusSubscriber};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{SessionConfig, SessionContext};
+use futures::Stream;
+use log::debug;
+use std::collections::{HashMap, HashSet};
+use std::pin::Pin;
+use std::sync::Arc;
/// Event broadcasting and subscription for cluster state changes.
pub mod event;
@@ -534,12 +523,6 @@ pub(crate) async fn bind_task_round_robin(
schedulable_tasks
}
-/// Maps execution plan to list of files it scans
-type GetScanFilesFunc = fn(
- &str,
- Arc<dyn ExecutionPlan>,
-) -> datafusion::common::Result<Vec<Vec<Vec<PartitionedFile>>>>;
-
/// User provided task distribution policy
#[async_trait::async_trait]
pub trait DistributionPolicy: std::fmt::Debug + Send + Sync {
@@ -572,208 +555,16 @@ pub trait DistributionPolicy: std::fmt::Debug + Send +
Sync {
fn name(&self) -> &str;
}
-pub(crate) async fn bind_task_consistent_hash(
- topology_nodes: HashMap<String, TopologyNode>,
- num_replicas: usize,
- tolerance: usize,
- running_jobs: Arc<HashMap<String, JobInfoCache>>,
- get_scan_files: GetScanFilesFunc,
-) -> Result<(Vec<BoundTask>, Option<ConsistentHash<TopologyNode>>)> {
- let mut total_slots = 0usize;
- for (_, node) in topology_nodes.iter() {
- total_slots += node.available_slots as usize;
- }
- if total_slots == 0 {
- debug!(
- "Not enough available executor slots for binding tasks with
consistent hashing policy!!!"
- );
- return Ok((vec![], None));
- }
- debug!("Total slot number for consistent hash binding is {total_slots}");
-
- let node_replicas = topology_nodes
- .into_values()
- .map(|node| (node, num_replicas))
- .collect::<Vec<_>>();
- let mut ch_topology: ConsistentHash<TopologyNode> =
- ConsistentHash::new(node_replicas);
-
- let mut schedulable_tasks: Vec<BoundTask> = vec![];
- for (job_id, job_info) in running_jobs.iter() {
- if !matches!(job_info.status, Some(job_status::Status::Running(_))) {
- debug!("Job {job_id} is not in running status and will be
skipped");
- continue;
- }
- let mut graph = job_info.execution_graph.write().await;
- let session_id = graph.session_id().to_string();
- let mut black_list = vec![];
- while let Some((running_stage, task_id_gen)) =
- graph.fetch_running_stage(&black_list)
- {
- let scan_files = get_scan_files(job_id,
running_stage.plan.clone())?;
- if is_skip_consistent_hash(&scan_files) {
- debug!(
- "Will skip stage {}/{} for consistent hashing task
binding",
- job_id, running_stage.stage_id
- );
- black_list.push(running_stage.stage_id);
- continue;
- }
- let pre_total_slots = total_slots;
- let scan_files = &scan_files[0];
- let tolerance_list = vec![0, tolerance];
- // First round with 0 tolerance consistent hashing policy
- // Second round with [`tolerance`] tolerance consistent hashing
policy
- for tolerance in tolerance_list {
- let runnable_tasks = running_stage
- .task_infos
- .iter_mut()
- .enumerate()
- .filter(|(_partition, info)| info.is_none())
- .take(total_slots)
- .collect::<Vec<_>>();
- for (partition_id, task_info) in runnable_tasks {
- let partition_files = &scan_files[partition_id];
- assert!(!partition_files.is_empty());
- // Currently we choose the first file for a task for
consistent hash.
- // Later when splitting files for tasks in datafusion,
it's better to
- // introduce this hash based policy besides the file
number policy or file size policy.
- let file_for_hash = &partition_files[0];
- if let Some(node) = ch_topology.get_mut_with_tolerance(
- file_for_hash.object_meta.location.as_ref().as_bytes(),
- tolerance,
- ) {
- let executor_id = node.id.clone();
- let task_id = *task_id_gen;
- *task_id_gen += 1;
- *task_info =
Some(create_task_info(executor_id.clone(), task_id));
-
- let partition = PartitionId {
- job_id: job_id.clone(),
- stage_id: running_stage.stage_id,
- partition_id,
- };
- let task_desc = TaskDescription {
- session_id: session_id.clone(),
- partition,
- stage_attempt_num: running_stage.stage_attempt_num,
- task_id,
- task_attempt: running_stage.task_failure_numbers
- [partition_id],
- plan: running_stage.plan.clone(),
- session_config:
running_stage.session_config.clone(),
- };
- schedulable_tasks.push((executor_id, task_desc));
-
- node.available_slots -= 1;
- total_slots -= 1;
- if total_slots == 0 {
- return Ok((schedulable_tasks, Some(ch_topology)));
- }
- }
- }
- }
- // Since there's no more tasks from this stage which can be bound,
- // we should skip this stage at the next round.
- if pre_total_slots == total_slots {
- black_list.push(running_stage.stage_id);
- }
- }
- }
-
- Ok((schedulable_tasks, Some(ch_topology)))
-}
-
-// If if there's no plan which needs to scan files, skip it.
-// Or there are multiple plans which need to scan files for a stage, skip it.
-pub(crate) fn is_skip_consistent_hash(scan_files:
&[Vec<Vec<PartitionedFile>>]) -> bool {
- scan_files.is_empty() || scan_files.len() > 1
-}
-
-/// Get all of the [`PartitionedFile`] to be scanned for an [`ExecutionPlan`]
-pub(crate) fn get_scan_files(
- plan: Arc<dyn ExecutionPlan>,
-) -> std::result::Result<Vec<Vec<Vec<PartitionedFile>>>, DataFusionError> {
- let mut collector: Vec<Vec<Vec<PartitionedFile>>> = vec![];
- plan.apply(&mut |plan: &Arc<dyn ExecutionPlan>| {
- let plan_any = plan.as_any();
-
- if let Some(config) = plan_any
- .downcast_ref::<DataSourceExec>()
- .and_then(|c|
c.data_source().as_any().downcast_ref::<FileScanConfig>())
- {
- collector.push(
- config
- .file_groups
- .iter()
- .map(|f| f.clone().into_inner())
- .collect(),
- );
- Ok(TreeNodeRecursion::Jump)
- } else {
- Ok(TreeNodeRecursion::Continue)
- }
- })?;
- Ok(collector)
-}
-
-/// Represents a node in the cluster topology for consistent hashing.
-#[derive(Clone)]
-pub struct TopologyNode {
- /// Unique executor ID.
- pub id: String,
- /// Host:port name for the node.
- pub name: String,
- /// Timestamp of last heartbeat received.
- pub last_seen_ts: u64,
- /// Number of available task slots on this node.
- pub available_slots: u32,
-}
-
-impl TopologyNode {
- fn new(
- host: &str,
- port: u16,
- id: &str,
- last_seen_ts: u64,
- available_slots: u32,
- ) -> Self {
- Self {
- id: id.to_string(),
- name: format!("{host}:{port}"),
- last_seen_ts,
- available_slots,
- }
- }
-}
-
-impl consistent_hash::node::Node for TopologyNode {
- fn name(&self) -> &str {
- &self.name
- }
-
- fn is_valid(&self) -> bool {
- self.available_slots > 0
- }
-}
-
#[cfg(test)]
mod test {
use std::collections::HashMap;
use std::sync::Arc;
- use datafusion::datasource::listing::PartitionedFile;
- use object_store::ObjectMeta;
- use object_store::path::Path;
-
use ballista_core::error::Result;
use ballista_core::serde::protobuf::AvailableTaskSlots;
use ballista_core::serde::scheduler::{ExecutorMetadata,
ExecutorSpecification};
- use crate::cluster::{
- BoundTask, TopologyNode, bind_task_bias, bind_task_consistent_hash,
- bind_task_round_robin,
- };
+ use crate::cluster::{BoundTask, bind_task_bias, bind_task_round_robin};
use crate::state::execution_graph::{ExecutionGraph, StaticExecutionGraph};
use crate::state::task_manager::JobInfoCache;
use crate::test_utils::{
@@ -887,100 +678,6 @@ mod test {
Ok(())
}
- #[tokio::test]
- async fn test_bind_task_consistent_hash() -> Result<()> {
- let num_partition = 8usize;
- let active_jobs = mock_active_jobs(num_partition).await?;
- let active_jobs = Arc::new(active_jobs);
- let topology_nodes = mock_topology_nodes();
- let num_replicas = 31;
- let tolerance = 0;
-
- // Check none scan files case
- {
- let (bound_tasks, _) = bind_task_consistent_hash(
- topology_nodes.clone(),
- num_replicas,
- tolerance,
- active_jobs.clone(),
- |_, _| Ok(vec![]),
- )
- .await?;
- assert_eq!(0, bound_tasks.len());
- }
-
- // Check job_b with scan files
- {
- let (bound_tasks, _) = bind_task_consistent_hash(
- topology_nodes,
- num_replicas,
- tolerance,
- active_jobs,
- |job_id, _| mock_get_scan_files("job_b", job_id, 8),
- )
- .await?;
- assert_eq!(6, bound_tasks.len());
-
- let result = get_result(bound_tasks);
-
- let mut expected = HashMap::new();
- {
- let mut entry_b = HashMap::new();
- entry_b.insert("executor_3".to_string(), 2);
- entry_b.insert("executor_2".to_string(), 3);
- entry_b.insert("executor_1".to_string(), 1);
-
- expected.insert("job_b".to_string(), entry_b);
- }
- assert!(
- expected.eq(&result),
- "The result {result:?} is not as expected {expected:?}"
- );
- }
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_bind_task_consistent_hash_with_tolerance() -> Result<()> {
- let num_partition = 8usize;
- let active_jobs = mock_active_jobs(num_partition).await?;
- let active_jobs = Arc::new(active_jobs);
- let topology_nodes = mock_topology_nodes();
- let num_replicas = 31;
- let tolerance = 1;
-
- {
- let (bound_tasks, _) = bind_task_consistent_hash(
- topology_nodes,
- num_replicas,
- tolerance,
- active_jobs,
- |job_id, _| mock_get_scan_files("job_b", job_id, 8),
- )
- .await?;
- assert_eq!(7, bound_tasks.len());
-
- let result = get_result(bound_tasks);
-
- let mut expected = HashMap::new();
- {
- let mut entry_b = HashMap::new();
- entry_b.insert("executor_3".to_string(), 3);
- entry_b.insert("executor_2".to_string(), 3);
- entry_b.insert("executor_1".to_string(), 1);
-
- expected.insert("job_b".to_string(), entry_b);
- }
- assert!(
- expected.eq(&result),
- "The result {result:?} is not as expected {expected:?}"
- );
- }
-
- Ok(())
- }
-
fn get_result(
bound_tasks: Vec<BoundTask>,
) -> HashMap<String, HashMap<String, usize>> {
@@ -1061,55 +758,4 @@ mod test {
},
]
}
-
- fn mock_topology_nodes() -> HashMap<String, TopologyNode> {
- let mut topology_nodes = HashMap::new();
- topology_nodes.insert(
- "executor_1".to_string(),
- TopologyNode::new("localhost", 8081, "executor_1", 0, 1),
- );
- topology_nodes.insert(
- "executor_2".to_string(),
- TopologyNode::new("localhost", 8082, "executor_2", 0, 3),
- );
- topology_nodes.insert(
- "executor_3".to_string(),
- TopologyNode::new("localhost", 8083, "executor_3", 0, 5),
- );
- topology_nodes
- }
-
- fn mock_get_scan_files(
- expected_job_id: &str,
- job_id: &str,
- num_partition: usize,
- ) -> datafusion::common::Result<Vec<Vec<Vec<PartitionedFile>>>> {
- Ok(if expected_job_id.eq(job_id) {
- mock_scan_files(num_partition)
- } else {
- vec![]
- })
- }
-
- fn mock_scan_files(num_partition: usize) -> Vec<Vec<Vec<PartitionedFile>>>
{
- let mut scan_files = vec![];
- for i in 0..num_partition {
- scan_files.push(vec![PartitionedFile {
- object_meta: ObjectMeta {
- location: Path::from(format!("file--{i}")),
- last_modified: Default::default(),
- size: 1,
- e_tag: None,
- version: None,
- },
- partition_values: vec![],
- range: None,
- extensions: None,
- statistics: None,
- metadata_size_hint: None,
- ordering: None,
- }]);
- }
- vec![scan_files]
- }
}
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index dd73ee9a1..cefe66dd3 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -435,11 +435,6 @@ pub enum TaskDistribution {
/// Distribute tasks evenly across executors. This will try and iterate
through available executors
/// and assign one task to each executor until all tasks are assigned.
RoundRobin,
- /// 1. Firstly, try to bind tasks without scanning source files by
`RoundRobin` policy.
- /// 2. Then for a task for scanning source files, firstly calculate a hash
value based on input files.
- /// And then bind it with an execute according to consistent hashing
policy.
- /// 3. If needed, work stealing can be enabled based on the tolerance of
the consistent hashing.
- ConsistentHash,
}
impl Display for TaskDistribution {
@@ -447,7 +442,6 @@ impl Display for TaskDistribution {
match self {
TaskDistribution::Bias => f.write_str("bias"),
TaskDistribution::RoundRobin => f.write_str("round-robin"),
- TaskDistribution::ConsistentHash => f.write_str("consistent-hash"),
}
}
}
@@ -471,16 +465,6 @@ pub enum TaskDistributionPolicy {
/// Distribute tasks evenly across executors. This will try and iterate
through available executors
/// and assign one task to each executor until all tasks are assigned.
RoundRobin,
- /// 1. Firstly, try to bind tasks without scanning source files by
`RoundRobin` policy.
- /// 2. Then for a task for scanning source files, firstly calculate a hash
value based on input files.
- /// And then bind it with an execute according to consistent hashing
policy.
- /// 3. If needed, work stealing can be enabled based on the tolerance of
the consistent hashing.
- ConsistentHash {
- /// Number of virtual nodes per executor on the consistent hash ring.
- num_replicas: usize,
- /// Tolerance for work stealing when slots are imbalanced.
- tolerance: usize,
- },
/// User provided task distribution policy
Custom(Arc<dyn DistributionPolicy>),
}
@@ -493,14 +477,6 @@ impl TryFrom<Config> for SchedulerConfig {
let task_distribution = match opt.task_distribution {
TaskDistribution::Bias => TaskDistributionPolicy::Bias,
TaskDistribution::RoundRobin => TaskDistributionPolicy::RoundRobin,
- TaskDistribution::ConsistentHash => {
- let num_replicas = opt.consistent_hash_num_replicas as usize;
- let tolerance = opt.consistent_hash_tolerance as usize;
- TaskDistributionPolicy::ConsistentHash {
- num_replicas,
- tolerance,
- }
- }
};
let config = SchedulerConfig {
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 4e1e9e5b1..3320934ba 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -134,11 +134,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
TaskDistributionPolicy::RoundRobin => {
bind_task_round_robin(available_slots, running_jobs, |_|
false).await
}
- TaskDistributionPolicy::ConsistentHash { .. } => {
- return Err(Status::unimplemented(
- "ConsistentHash TaskDistribution is not feasible for
pull-based task scheduling",
- ));
- }
TaskDistributionPolicy::Custom(ref policy) => policy
.bind_tasks(available_slots, running_jobs)
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index 56b847e13..dbc996eb4 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -103,6 +103,6 @@ _Example: Specifying configuration options when starting
the scheduler_
| -------------------------------------------- | ------ | ----------- |
--------------------------------------------------------------------------------------------------------------------------
|
| scheduler-policy | Utf8 | pull-staged | Sets
the task scheduling policy for the scheduler, possible values: pull-staged,
push-staged. |
| event-loop-buffer-size | UInt32 | 10000 | Sets
the event loop buffer size. for a system of high throughput, a larger value
like 1000000 is recommended. |
-| task-distribution | Utf8 | bias | Sets
the task distribution policy for the scheduler, possible values: bias,
round-robin, consistent-hash. |
+| task-distribution | Utf8 | bias | Sets
the task distribution policy for the scheduler, possible values: bias,
round-robin |
| finished-job-data-clean-up-interval-seconds | UInt64 | 300 | Sets
the delayed interval for cleaning up finished job data, mainly the shuffle
data, 0 means the cleaning up is disabled. |
| finished-job-state-clean-up-interval-seconds | UInt64 | 3600 | Sets
the delayed interval for cleaning up finished job state stored in the backend,
0 means the cleaning up is disabled. |
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]