This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 80c2c56c chore: dependency cleanup (#1150)
80c2c56c is described below
commit 80c2c56cf00448dac3dae3e66a80c2b4450ad17b
Author: Marko Milenković <[email protected]>
AuthorDate: Sat Dec 14 13:29:25 2024 +0000
chore: dependency cleanup (#1150)
---
Cargo.toml | 6 +-
ballista-cli/Cargo.toml | 1 -
ballista/client/Cargo.toml | 8 +-
ballista/core/Cargo.toml | 9 +-
ballista/core/src/config.rs | 25 ++--
ballista/core/src/diagram.rs | 148 +++++++++++++++++++++
ballista/core/src/error.rs | 48 +------
ballista/core/src/lib.rs | 5 +-
ballista/core/src/utils.rs | 129 +-----------------
ballista/executor/Cargo.toml | 13 +-
ballista/executor/build.rs | 8 +-
ballista/executor/src/bin/main.rs | 3 +-
ballista/executor/src/config.rs | 1 +
ballista/executor/src/execution_loop.rs | 20 ++-
ballista/executor/src/executor.rs | 4 +-
ballista/executor/src/executor_process.rs | 39 ++++--
ballista/executor/src/flight_service.rs | 12 +-
ballista/executor/src/lib.rs | 1 +
ballista/scheduler/Cargo.toml | 22 ++-
ballista/scheduler/build.rs | 4 +-
ballista/scheduler/src/bin/main.rs | 11 +-
ballista/scheduler/src/cluster/memory.rs | 3 +-
ballista/scheduler/src/cluster/mod.rs | 7 +-
ballista/scheduler/src/config.rs | 72 ++--------
ballista/scheduler/src/scheduler_process.rs | 8 +-
.../src/scheduler_server/query_stage_scheduler.rs | 5 -
ballista/scheduler/src/state/task_manager.rs | 4 +-
benchmarks/Cargo.toml | 1 -
examples/Cargo.toml | 6 +-
examples/examples/custom-executor.rs | 36 ++---
examples/examples/custom-scheduler.rs | 46 +++----
python/Cargo.toml | 7 +-
python/src/cluster.rs | 10 +-
33 files changed, 320 insertions(+), 402 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index a68222a2..f9206458 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,7 +21,6 @@ members = ["ballista-cli", "ballista/client",
"ballista/core", "ballista/executo
resolver = "2"
[workspace.dependencies]
-anyhow = "1"
arrow = { version = "53", features = ["ipc_compression"] }
arrow-flight = { version = "53", features = ["flight-sql-experimental"] }
clap = { version = "4.5", features = ["derive", "cargo"] }
@@ -40,9 +39,9 @@ tonic-build = { version = "0.12", default-features = false,
features = [
"transport",
"prost"
] }
-tracing = "0.1.36"
+tracing = "0.1"
tracing-appender = "0.2.2"
-tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
+tracing-subscriber = { version = "0.3", features = ["env-filter"] }
ctor = { version = "0.2" }
mimalloc = { version = "0.1" }
@@ -58,7 +57,6 @@ dashmap = { version = "6.1" }
async-trait = { version = "0.1.4" }
serde = { version = "1.0" }
tokio-stream = { version = "0.1" }
-parse_arg = { version = "0.1" }
url = { version = "2.5" }
# cargo build --profile release-lto
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 2f1ddeb0..9b527e56 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -25,7 +25,6 @@ keywords = ["ballista", "cli"]
license = "Apache-2.0"
homepage = "https://github.com/apache/arrow-ballista"
repository = "https://github.com/apache/arrow-ballista"
-rust-version = "1.72"
readme = "README.md"
[dependencies]
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index 9614412f..e462367a 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -25,7 +25,6 @@ repository = "https://github.com/apache/arrow-ballista"
readme = "README.md"
authors = ["Apache DataFusion <[email protected]>"]
edition = "2021"
-rust-version = "1.72"
[dependencies]
async-trait = { workspace = true }
@@ -33,11 +32,8 @@ ballista-core = { path = "../core", version = "0.12.0" }
ballista-executor = { path = "../executor", version = "0.12.0", optional =
true }
ballista-scheduler = { path = "../scheduler", version = "0.12.0", optional =
true }
datafusion = { workspace = true }
-datafusion-proto = { workspace = true }
-futures = { workspace = true }
log = { workspace = true }
-parking_lot = { workspace = true }
-tempfile = { workspace = true }
+
tokio = { workspace = true }
url = { workspace = true }
@@ -45,8 +41,10 @@ url = { workspace = true }
ballista-executor = { path = "../executor", version = "0.12.0" }
ballista-scheduler = { path = "../scheduler", version = "0.12.0" }
ctor = { workspace = true }
+datafusion-proto = { workspace = true }
env_logger = { workspace = true }
rstest = { version = "0.23" }
+tempfile = { workspace = true }
tonic = { workspace = true }
[features]
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 80a3d102..1bf88858 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -34,25 +34,24 @@ exclude = ["*.proto"]
rustc-args = ["--cfg", "docsrs"]
[features]
+build-binary = ["configure_me", "clap"]
docsrs = []
# Used for testing ONLY: causes all values to hash to the same value (test for
collisions)
force_hash_collisions = ["datafusion/force_hash_collisions"]
-
[dependencies]
arrow-flight = { workspace = true }
async-trait = { workspace = true }
chrono = { version = "0.4", default-features = false }
-clap = { workspace = true }
+clap = { workspace = true, optional = true }
+configure_me = { workspace = true, optional = true }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-proto-common = { workspace = true }
futures = { workspace = true }
-
itertools = "0.13"
log = { workspace = true }
md-5 = { version = "^0.10.0" }
-parse_arg = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
rand = { workspace = true }
@@ -66,5 +65,5 @@ url = { workspace = true }
tempfile = { workspace = true }
[build-dependencies]
-rustc_version = "0.4.0"
+rustc_version = "0.4.1"
tonic-build = { workspace = true }
diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index e00cd115..cb7f7c5d 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -18,8 +18,6 @@
//! Ballista configuration
-use clap::ValueEnum;
-use core::fmt;
use std::collections::HashMap;
use std::result;
@@ -252,30 +250,33 @@ impl datafusion::config::ConfigExtension for
BallistaConfig {
// an enum used to configure the scheduler policy
// needs to be visible to code generated by configure_me
-#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)]
+#[derive(Clone, Copy, Debug, serde::Deserialize, Default)]
+#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
pub enum TaskSchedulingPolicy {
#[default]
PullStaged,
PushStaged,
}
+#[cfg(feature = "build-binary")]
impl std::str::FromStr for TaskSchedulingPolicy {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
- ValueEnum::from_str(s, true)
+ clap::ValueEnum::from_str(s, true)
}
}
-
-impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy {
- fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
+#[cfg(feature = "build-binary")]
+impl configure_me::parse_arg::ParseArgFromStr for TaskSchedulingPolicy {
+ fn describe_type<W: core::fmt::Write>(mut writer: W) -> core::fmt::Result {
write!(writer, "The scheduler policy for the scheduler")
}
}
// an enum used to configure the log rolling policy
// needs to be visible to code generated by configure_me
-#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)]
+#[derive(Clone, Copy, Debug, serde::Deserialize, Default)]
+#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
pub enum LogRotationPolicy {
Minutely,
Hourly,
@@ -284,16 +285,18 @@ pub enum LogRotationPolicy {
Never,
}
+#[cfg(feature = "build-binary")]
impl std::str::FromStr for LogRotationPolicy {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
- ValueEnum::from_str(s, true)
+ clap::ValueEnum::from_str(s, true)
}
}
-impl parse_arg::ParseArgFromStr for LogRotationPolicy {
- fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
+#[cfg(feature = "build-binary")]
+impl configure_me::parse_arg::ParseArgFromStr for LogRotationPolicy {
+ fn describe_type<W: core::fmt::Write>(mut writer: W) -> core::fmt::Result {
write!(writer, "The log rotation policy")
}
}
diff --git a/ballista/core/src/diagram.rs b/ballista/core/src/diagram.rs
new file mode 100644
index 00000000..9ef0da98
--- /dev/null
+++ b/ballista/core/src/diagram.rs
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Result;
+use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
+
+use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
+use datafusion::physical_plan::aggregates::AggregateExec;
+use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion::physical_plan::filter::FilterExec;
+use datafusion::physical_plan::joins::HashJoinExec;
+use datafusion::physical_plan::projection::ProjectionExec;
+use datafusion::physical_plan::sorts::sort::SortExec;
+use datafusion::physical_plan::ExecutionPlan;
+use std::fs::File;
+use std::io::{BufWriter, Write};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+
+pub fn produce_diagram(filename: &str, stages: &[Arc<ShuffleWriterExec>]) ->
Result<()> {
+ let write_file = File::create(filename)?;
+ let mut w = BufWriter::new(&write_file);
+ writeln!(w, "digraph G {{")?;
+
+ // draw stages and entities
+ for stage in stages {
+ writeln!(w, "\tsubgraph cluster{} {{", stage.stage_id())?;
+ writeln!(w, "\t\tlabel = \"Stage {}\";", stage.stage_id())?;
+ let mut id = AtomicUsize::new(0);
+ build_exec_plan_diagram(
+ &mut w,
+ stage.children()[0].as_ref(),
+ stage.stage_id(),
+ &mut id,
+ true,
+ )?;
+ writeln!(w, "\t}}")?;
+ }
+
+ // draw relationships
+ for stage in stages {
+ let mut id = AtomicUsize::new(0);
+ build_exec_plan_diagram(
+ &mut w,
+ stage.children()[0].as_ref(),
+ stage.stage_id(),
+ &mut id,
+ false,
+ )?;
+ }
+
+ write!(w, "}}")?;
+ Ok(())
+}
+
+fn build_exec_plan_diagram(
+ w: &mut BufWriter<&File>,
+ plan: &dyn ExecutionPlan,
+ stage_id: usize,
+ id: &mut AtomicUsize,
+ draw_entity: bool,
+) -> Result<usize> {
+ let operator_str = if
plan.as_any().downcast_ref::<AggregateExec>().is_some() {
+ "AggregateExec"
+ } else if plan.as_any().downcast_ref::<SortExec>().is_some() {
+ "SortExec"
+ } else if plan.as_any().downcast_ref::<ProjectionExec>().is_some() {
+ "ProjectionExec"
+ } else if plan.as_any().downcast_ref::<HashJoinExec>().is_some() {
+ "HashJoinExec"
+ } else if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
+ "ParquetExec"
+ } else if plan.as_any().downcast_ref::<CsvExec>().is_some() {
+ "CsvExec"
+ } else if plan.as_any().downcast_ref::<FilterExec>().is_some() {
+ "FilterExec"
+ } else if plan.as_any().downcast_ref::<ShuffleWriterExec>().is_some() {
+ "ShuffleWriterExec"
+ } else if plan
+ .as_any()
+ .downcast_ref::<UnresolvedShuffleExec>()
+ .is_some()
+ {
+ "UnresolvedShuffleExec"
+ } else if plan
+ .as_any()
+ .downcast_ref::<CoalesceBatchesExec>()
+ .is_some()
+ {
+ "CoalesceBatchesExec"
+ } else if plan
+ .as_any()
+ .downcast_ref::<CoalescePartitionsExec>()
+ .is_some()
+ {
+ "CoalescePartitionsExec"
+ } else {
+ println!("Unknown: {plan:?}");
+ "Unknown"
+ };
+
+ let node_id = id.load(Ordering::SeqCst);
+ id.store(node_id + 1, Ordering::SeqCst);
+
+ if draw_entity {
+ writeln!(
+ w,
+ "\t\tstage_{stage_id}_exec_{node_id} [shape=box,
label=\"{operator_str}\"];"
+ )?;
+ }
+ for child in plan.children() {
+ if let Some(shuffle) =
child.as_any().downcast_ref::<UnresolvedShuffleExec>() {
+ if !draw_entity {
+ writeln!(
+ w,
+ "\tstage_{}_exec_1 -> stage_{}_exec_{};",
+ shuffle.stage_id, stage_id, node_id
+ )?;
+ }
+ } else {
+ // relationships within same entity
+ let child_id =
+ build_exec_plan_diagram(w, child.as_ref(), stage_id, id,
draw_entity)?;
+ if draw_entity {
+ writeln!(
+ w,
+ "\t\tstage_{stage_id}_exec_{child_id} ->
stage_{stage_id}_exec_{node_id};"
+ )?;
+ }
+ }
+ }
+ Ok(node_id)
+}
diff --git a/ballista/core/src/error.rs b/ballista/core/src/error.rs
index cbdd90a7..05a706cc 100644
--- a/ballista/core/src/error.rs
+++ b/ballista/core/src/error.rs
@@ -37,15 +37,11 @@ pub enum BallistaError {
NotImplemented(String),
General(String),
Internal(String),
+ Configuration(String),
ArrowError(ArrowError),
DataFusionError(DataFusionError),
SqlError(parser::ParserError),
IoError(io::Error),
- // ReqwestError(reqwest::Error),
- // HttpError(http::Error),
- // KubeAPIError(kube::error::Error),
- // KubeAPIRequestError(k8s_openapi::RequestError),
- // KubeAPIResponseError(k8s_openapi::ResponseError),
TonicError(tonic::transport::Error),
GrpcError(tonic::Status),
GrpcConnectionError(String),
@@ -112,36 +108,6 @@ impl From<io::Error> for BallistaError {
}
}
-// impl From<reqwest::Error> for BallistaError {
-// fn from(e: reqwest::Error) -> Self {
-// BallistaError::ReqwestError(e)
-// }
-// }
-//
-// impl From<http::Error> for BallistaError {
-// fn from(e: http::Error) -> Self {
-// BallistaError::HttpError(e)
-// }
-// }
-
-// impl From<kube::error::Error> for BallistaError {
-// fn from(e: kube::error::Error) -> Self {
-// BallistaError::KubeAPIError(e)
-// }
-// }
-
-// impl From<k8s_openapi::RequestError> for BallistaError {
-// fn from(e: k8s_openapi::RequestError) -> Self {
-// BallistaError::KubeAPIRequestError(e)
-// }
-// }
-
-// impl From<k8s_openapi::ResponseError> for BallistaError {
-// fn from(e: k8s_openapi::ResponseError) -> Self {
-// BallistaError::KubeAPIResponseError(e)
-// }
-// }
-
impl From<tonic::transport::Error> for BallistaError {
fn from(e: tonic::transport::Error) -> Self {
BallistaError::TonicError(e)
@@ -191,15 +157,6 @@ impl Display for BallistaError {
}
BallistaError::SqlError(ref desc) => write!(f, "SQL error:
{desc}"),
BallistaError::IoError(ref desc) => write!(f, "IO error: {desc}"),
- // BallistaError::ReqwestError(ref desc) => write!(f, "Reqwest
error: {}", desc),
- // BallistaError::HttpError(ref desc) => write!(f, "HTTP error:
{}", desc),
- // BallistaError::KubeAPIError(ref desc) => write!(f, "Kube API
error: {}", desc),
- // BallistaError::KubeAPIRequestError(ref desc) => {
- // write!(f, "KubeAPI request error: {}", desc)
- // }
- // BallistaError::KubeAPIResponseError(ref desc) => {
- // write!(f, "KubeAPI response error: {}", desc)
- // }
BallistaError::TonicError(desc) => write!(f, "Tonic error:
{desc}"),
BallistaError::GrpcError(desc) => write!(f, "Grpc error: {desc}"),
BallistaError::GrpcConnectionError(desc) => {
@@ -220,6 +177,9 @@ impl Display for BallistaError {
)
}
BallistaError::Cancelled => write!(f, "Task cancelled"),
+ BallistaError::Configuration(desc) => {
+ write!(f, "Configuration error: {desc}")
+ }
}
}
}
diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs
index c2d92d35..7864d56e 100644
--- a/ballista/core/src/lib.rs
+++ b/ballista/core/src/lib.rs
@@ -29,15 +29,14 @@ pub fn print_version() {
pub mod client;
pub mod config;
pub mod consistent_hash;
+pub mod diagram;
pub mod error;
pub mod event_loop;
pub mod execution_plans;
pub mod extension;
pub mod registry;
-pub mod utils;
-
-#[macro_use]
pub mod serde;
+pub mod utils;
///
/// [RuntimeProducer] is a factory which creates runtime [RuntimeEnv]
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 1506c2bb..14eeb9a2 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -17,9 +17,7 @@
use crate::config::BallistaConfig;
use crate::error::{BallistaError, Result};
-use crate::execution_plans::{
- DistributedQueryExec, ShuffleWriterExec, UnresolvedShuffleExec,
-};
+use crate::execution_plans::DistributedQueryExec;
use crate::extension::SessionConfigExt;
use crate::serde::scheduler::PartitionStats;
@@ -32,29 +30,19 @@ use datafusion::arrow::ipc::writer::StreamWriter;
use datafusion::arrow::ipc::CompressionType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor};
-use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
use datafusion::error::DataFusionError;
use datafusion::execution::context::{QueryPlanner, SessionConfig,
SessionState};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::logical_expr::{DdlStatement, LogicalPlan, TableScan};
-use datafusion::physical_plan::aggregates::AggregateExec;
-use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
-use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::empty::EmptyExec;
-use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::metrics::MetricsSet;
-use datafusion::physical_plan::projection::ProjectionExec;
-use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream};
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion_proto::logical_plan::{AsLogicalPlan, LogicalExtensionCodec};
use futures::StreamExt;
use log::error;
-use std::io::{BufWriter, Write};
use std::marker::PhantomData;
-use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fs::File, pin::Pin};
@@ -129,121 +117,6 @@ pub async fn collect_stream(
Ok(batches)
}
-pub fn produce_diagram(filename: &str, stages: &[Arc<ShuffleWriterExec>]) ->
Result<()> {
- let write_file = File::create(filename)?;
- let mut w = BufWriter::new(&write_file);
- writeln!(w, "digraph G {{")?;
-
- // draw stages and entities
- for stage in stages {
- writeln!(w, "\tsubgraph cluster{} {{", stage.stage_id())?;
- writeln!(w, "\t\tlabel = \"Stage {}\";", stage.stage_id())?;
- let mut id = AtomicUsize::new(0);
- build_exec_plan_diagram(
- &mut w,
- stage.children()[0].as_ref(),
- stage.stage_id(),
- &mut id,
- true,
- )?;
- writeln!(w, "\t}}")?;
- }
-
- // draw relationships
- for stage in stages {
- let mut id = AtomicUsize::new(0);
- build_exec_plan_diagram(
- &mut w,
- stage.children()[0].as_ref(),
- stage.stage_id(),
- &mut id,
- false,
- )?;
- }
-
- write!(w, "}}")?;
- Ok(())
-}
-
-fn build_exec_plan_diagram(
- w: &mut BufWriter<&File>,
- plan: &dyn ExecutionPlan,
- stage_id: usize,
- id: &mut AtomicUsize,
- draw_entity: bool,
-) -> Result<usize> {
- let operator_str = if
plan.as_any().downcast_ref::<AggregateExec>().is_some() {
- "AggregateExec"
- } else if plan.as_any().downcast_ref::<SortExec>().is_some() {
- "SortExec"
- } else if plan.as_any().downcast_ref::<ProjectionExec>().is_some() {
- "ProjectionExec"
- } else if plan.as_any().downcast_ref::<HashJoinExec>().is_some() {
- "HashJoinExec"
- } else if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
- "ParquetExec"
- } else if plan.as_any().downcast_ref::<CsvExec>().is_some() {
- "CsvExec"
- } else if plan.as_any().downcast_ref::<FilterExec>().is_some() {
- "FilterExec"
- } else if plan.as_any().downcast_ref::<ShuffleWriterExec>().is_some() {
- "ShuffleWriterExec"
- } else if plan
- .as_any()
- .downcast_ref::<UnresolvedShuffleExec>()
- .is_some()
- {
- "UnresolvedShuffleExec"
- } else if plan
- .as_any()
- .downcast_ref::<CoalesceBatchesExec>()
- .is_some()
- {
- "CoalesceBatchesExec"
- } else if plan
- .as_any()
- .downcast_ref::<CoalescePartitionsExec>()
- .is_some()
- {
- "CoalescePartitionsExec"
- } else {
- println!("Unknown: {plan:?}");
- "Unknown"
- };
-
- let node_id = id.load(Ordering::SeqCst);
- id.store(node_id + 1, Ordering::SeqCst);
-
- if draw_entity {
- writeln!(
- w,
- "\t\tstage_{stage_id}_exec_{node_id} [shape=box,
label=\"{operator_str}\"];"
- )?;
- }
- for child in plan.children() {
- if let Some(shuffle) =
child.as_any().downcast_ref::<UnresolvedShuffleExec>() {
- if !draw_entity {
- writeln!(
- w,
- "\tstage_{}_exec_1 -> stage_{}_exec_{};",
- shuffle.stage_id, stage_id, node_id
- )?;
- }
- } else {
- // relationships within same entity
- let child_id =
- build_exec_plan_diagram(w, child.as_ref(), stage_id, id,
draw_entity)?;
- if draw_entity {
- writeln!(
- w,
- "\t\tstage_{stage_id}_exec_{child_id} ->
stage_{stage_id}_exec_{node_id};"
- )?;
- }
- }
- }
- Ok(node_id)
-}
-
pub struct BallistaQueryPlanner<T: AsLogicalPlan> {
scheduler_url: String,
config: BallistaConfig,
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index abe256eb..6a2dfa61 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -32,17 +32,18 @@ executor = "executor_config_spec.toml"
[[bin]]
name = "ballista-executor"
path = "src/bin/main.rs"
+required-features = ["build-binary"]
[features]
-default = ["mimalloc"]
+build-binary = ["configure_me", "tracing-subscriber", "tracing-appender",
"tracing", "ballista-core/build-binary"]
+default = ["build-binary", "mimalloc"]
[dependencies]
-anyhow = { workspace = true }
arrow = { workspace = true }
arrow-flight = { workspace = true }
async-trait = { workspace = true }
ballista-core = { path = "../core", version = "0.12.0" }
-configure_me = { workspace = true }
+configure_me = { workspace = true, optional = true }
dashmap = { workspace = true }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
@@ -60,9 +61,9 @@ tokio = { workspace = true, features = [
] }
tokio-stream = { workspace = true, features = ["net"] }
tonic = { workspace = true }
-tracing = { workspace = true }
-tracing-appender = { workspace = true }
-tracing-subscriber = { workspace = true }
+tracing = { workspace = true, optional = true }
+tracing-appender = { workspace = true, optional = true }
+tracing-subscriber = { workspace = true, optional = true }
uuid = { workspace = true }
[dev-dependencies]
diff --git a/ballista/executor/build.rs b/ballista/executor/build.rs
index 7d2b9b87..21ce2d8f 100644
--- a/ballista/executor/build.rs
+++ b/ballista/executor/build.rs
@@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-extern crate configure_me_codegen;
-
fn main() -> Result<(), String> {
+ #[cfg(feature = "build-binary")]
println!("cargo:rerun-if-changed=executor_config_spec.toml");
+ #[cfg(feature = "build-binary")]
configure_me_codegen::build_script_auto()
- .map_err(|e| format!("configure_me code generation failed: {e}"))
+ .map_err(|e| format!("configure_me code generation failed: {e}"))?;
+
+ Ok(())
}
diff --git a/ballista/executor/src/bin/main.rs
b/ballista/executor/src/bin/main.rs
index 2ab1a90c..18abb996 100644
--- a/ballista/executor/src/bin/main.rs
+++ b/ballista/executor/src/bin/main.rs
@@ -17,7 +17,6 @@
//! Ballista Rust executor binary.
-use anyhow::Result;
use ballista_core::config::LogRotationPolicy;
use ballista_core::print_version;
use ballista_executor::config::prelude::*;
@@ -33,7 +32,7 @@ use tracing_subscriber::EnvFilter;
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[tokio::main]
-async fn main() -> Result<()> {
+async fn main() -> ballista_core::error::Result<()> {
// parse command-line arguments
let (opt, _remaining_args) =
Config::including_optional_config_files(&["/etc/ballista/executor.toml"])
diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs
index 65fa9d46..91b54732 100644
--- a/ballista/executor/src/config.rs
+++ b/ballista/executor/src/config.rs
@@ -21,6 +21,7 @@ use crate::executor_process::ExecutorProcessConfig;
// Ideally we would use the include_config macro from configure_me, but then
we cannot use
// #[allow(clippy::all)] to silence clippy warnings from the generated code
+
include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs"));
impl TryFrom<Config> for ExecutorProcessConfig {
diff --git a/ballista/executor/src/execution_loop.rs
b/ballista/executor/src/execution_loop.rs
index 649b366b..2094425d 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -77,16 +77,14 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U:
'static + AsExecutionPlan>
let task_status: Vec<TaskStatus> =
sample_tasks_status(&mut task_status_receiver).await;
- let poll_work_result: anyhow::Result<
- tonic::Response<PollWorkResult>,
- tonic::Status,
- > = scheduler
- .poll_work(PollWorkParams {
- metadata: Some(executor.metadata.clone()),
- num_free_slots: available_task_slots.available_permits() as
u32,
- task_status,
- })
- .await;
+ let poll_work_result: Result<tonic::Response<PollWorkResult>,
tonic::Status> =
+ scheduler
+ .poll_work(PollWorkParams {
+ metadata: Some(executor.metadata.clone()),
+ num_free_slots: available_task_slots.available_permits()
as u32,
+ task_status,
+ })
+ .await;
match poll_work_result {
Ok(result) => {
@@ -274,7 +272,7 @@ async fn sample_tasks_status(
loop {
match task_status_receiver.try_recv() {
- anyhow::Result::Ok(status) => {
+ Result::Ok(status) => {
task_status.push(status);
}
Err(TryRecvError::Empty) => {
diff --git a/ballista/executor/src/executor.rs
b/ballista/executor/src/executor.rs
index 1ebf3e56..1b029e17 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -215,13 +215,13 @@ impl Executor {
mod test {
use crate::execution_engine::DefaultQueryStageExec;
use crate::executor::Executor;
- use arrow::datatypes::{Schema, SchemaRef};
- use arrow::record_batch::RecordBatch;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf::ExecutorRegistration;
use ballista_core::serde::scheduler::PartitionId;
use ballista_core::utils::default_config_producer;
use ballista_core::RuntimeProducer;
+ use datafusion::arrow::datatypes::{Schema, SchemaRef};
+ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
diff --git a/ballista/executor/src/executor_process.rs
b/ballista/executor/src/executor_process.rs
index ed690288..e350f391 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -22,7 +22,6 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant, UNIX_EPOCH};
-use anyhow::{Context, Result};
use arrow_flight::flight_service_server::FlightServiceServer;
use ballista_core::registry::BallistaFunctionRegistry;
use datafusion_proto::logical_plan::LogicalExtensionCodec;
@@ -148,11 +147,13 @@ impl Default for ExecutorProcessConfig {
}
}
-pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) ->
Result<()> {
+pub async fn start_executor_process(
+ opt: Arc<ExecutorProcessConfig>,
+) -> ballista_core::error::Result<()> {
let addr = format!("{}:{}", opt.bind_host, opt.port);
- let addr = addr
- .parse()
- .with_context(|| format!("Could not parse address: {addr}"))?;
+ let addr = addr.parse().map_err(|e: std::net::AddrParseError| {
+ BallistaError::Configuration(e.to_string())
+ })?;
let scheduler_host = opt.scheduler_host.clone();
let scheduler_port = opt.scheduler_port;
@@ -237,7 +238,11 @@ pub async fn start_executor_process(opt:
Arc<ExecutorProcessConfig>) -> Result<(
let connection = if connect_timeout == 0 {
create_grpc_client_connection(scheduler_url)
.await
- .context("Could not connect to scheduler")
+ .map_err(|_| {
+ BallistaError::GrpcConnectionError(
+ "Could not connect to scheduler".to_string(),
+ )
+ })
} else {
// this feature was added to support docker-compose so that we can
have the executor
// wait for the scheduler to start, or at least run for 10 seconds
before failing so
@@ -249,8 +254,11 @@ pub async fn start_executor_process(opt:
Arc<ExecutorProcessConfig>) -> Result<(
{
match create_grpc_client_connection(scheduler_url.clone())
.await
- .context("Could not connect to scheduler")
- {
+ .map_err(|_| {
+ BallistaError::GrpcConnectionError(
+ "Could not connect to scheduler".to_string(),
+ )
+ }) {
Ok(conn) => {
info!("Connected to scheduler at {}", scheduler_url);
x = Some(conn);
@@ -268,8 +276,7 @@ pub async fn start_executor_process(opt:
Arc<ExecutorProcessConfig>) -> Result<(
Some(conn) => Ok(conn),
_ => Err(BallistaError::General(format!(
"Timed out attempting to connect to scheduler at
{scheduler_url}"
- ))
- .into()),
+ ))),
}
}?;
@@ -489,7 +496,10 @@ async fn check_services(
/// This function will be scheduled periodically for cleanup the job shuffle
data left on the executor.
/// Only directories will be checked cleaned.
-async fn clean_shuffle_data_loop(work_dir: &str, seconds: u64) -> Result<()> {
+async fn clean_shuffle_data_loop(
+ work_dir: &str,
+ seconds: u64,
+) -> ballista_core::error::Result<()> {
let mut dir = fs::read_dir(work_dir).await?;
let mut to_deleted = Vec::new();
while let Some(child) = dir.next_entry().await? {
@@ -527,7 +537,7 @@ async fn clean_shuffle_data_loop(work_dir: &str, seconds:
u64) -> Result<()> {
}
/// This function will clean up all shuffle data on this executor
-async fn clean_all_shuffle_data(work_dir: &str) -> Result<()> {
+async fn clean_all_shuffle_data(work_dir: &str) ->
ballista_core::error::Result<()> {
let mut dir = fs::read_dir(work_dir).await?;
let mut to_deleted = Vec::new();
while let Some(child) = dir.next_entry().await? {
@@ -552,7 +562,10 @@ async fn clean_all_shuffle_data(work_dir: &str) ->
Result<()> {
/// Determines if a directory contains files newer than the cutoff time.
/// If return true, it means the directory contains files newer than the
cutoff time. It satisfy the ttl and should not be deleted.
-pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds: u64) -> Result<bool> {
+pub async fn satisfy_dir_ttl(
+ dir: DirEntry,
+ ttl_seconds: u64,
+) -> ballista_core::error::Result<bool> {
let cutoff = get_time_before(ttl_seconds);
let mut to_check = vec![dir];
diff --git a/ballista/executor/src/flight_service.rs
b/ballista/executor/src/flight_service.rs
index a96a752c..939b5a8f 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -17,24 +17,24 @@
//! Implementation of the Apache Arrow Flight protocol that wraps an executor.
-use arrow::ipc::reader::StreamReader;
+use datafusion::arrow::ipc::reader::StreamReader;
use std::convert::TryFrom;
use std::fs::File;
use std::pin::Pin;
-use arrow::ipc::CompressionType;
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::error::FlightError;
use ballista_core::error::BallistaError;
use ballista_core::serde::decode_protobuf;
use ballista_core::serde::scheduler::Action as BallistaAction;
+use datafusion::arrow::ipc::CompressionType;
-use arrow::ipc::writer::IpcWriteOptions;
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
HandshakeResponse,
PollInfo, PutResult, SchemaResult, Ticket,
};
+use datafusion::arrow::ipc::writer::IpcWriteOptions;
use datafusion::arrow::{error::ArrowError, record_batch::RecordBatch};
use futures::{Stream, StreamExt, TryStreamExt};
use log::{debug, info};
@@ -45,7 +45,6 @@ use tokio::{sync::mpsc::Sender, task};
use tokio_stream::wrappers::ReceiverStream;
use tonic::metadata::MetadataValue;
use tonic::{Request, Response, Status, Streaming};
-use tracing::warn;
/// Service implementing the Apache Arrow Flight Protocol
#[derive(Clone)]
@@ -103,7 +102,10 @@ impl FlightService for BallistaFlightService {
let schema = reader.schema();
task::spawn_blocking(move || {
if let Err(e) = read_partition(reader, tx) {
- warn!(error = %e, "error streaming shuffle partition");
+ log::warn!(
+ "error streaming shuffle partition: {}",
+ e.to_string()
+ );
}
});
diff --git a/ballista/executor/src/lib.rs b/ballista/executor/src/lib.rs
index f0284cbd..23e68f85 100644
--- a/ballista/executor/src/lib.rs
+++ b/ballista/executor/src/lib.rs
@@ -18,6 +18,7 @@
#![doc = include_str!("../README.md")]
pub mod collect;
+#[cfg(feature = "build-binary")]
pub mod config;
pub mod execution_engine;
pub mod execution_loop;
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index ad3e0963..fc3ca09a 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -32,34 +32,33 @@ scheduler = "scheduler_config_spec.toml"
[[bin]]
name = "ballista-scheduler"
path = "src/bin/main.rs"
+required-features = ["build-binary"]
[features]
-default = []
-flight-sql = []
+build-binary = ["configure_me", "clap", "tracing-subscriber",
"tracing-appender", "tracing", "ballista-core/build-binary"]
+default = ["build-binary"]
+flight-sql = ["base64"]
keda-scaler = []
prometheus-metrics = ["prometheus", "once_cell"]
rest-api = []
[dependencies]
-anyhow = { workspace = true }
arrow-flight = { workspace = true }
async-trait = { workspace = true }
axum = "0.7.7"
ballista-core = { path = "../core", version = "0.12.0" }
-base64 = { version = "0.22" }
-clap = { workspace = true }
-configure_me = { workspace = true }
+base64 = { version = "0.22", optional = true }
+clap = { workspace = true, optional = true }
+configure_me = { workspace = true, optional = true }
dashmap = { workspace = true }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
futures = { workspace = true }
-graphviz-rust = "0.9.0"
http = "1.1"
log = { workspace = true }
object_store = { workspace = true }
once_cell = { version = "1.16.0", optional = true }
parking_lot = { workspace = true }
-parse_arg = { workspace = true }
prometheus = { version = "0.13", features = ["process"], optional = true }
prost = { workspace = true }
prost-types = { workspace = true }
@@ -68,13 +67,12 @@ serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true, features = ["net"] }
tonic = { workspace = true }
-tracing = { workspace = true }
-tracing-appender = { workspace = true }
-tracing-subscriber = { workspace = true }
+tracing = { workspace = true, optional = true }
+tracing-appender = { workspace = true, optional = true }
+tracing-subscriber = { workspace = true, optional = true }
uuid = { workspace = true }
[dev-dependencies]
-ballista-core = { path = "../core", version = "0.12.0" }
[build-dependencies]
configure_me_codegen = { workspace = true }
diff --git a/ballista/scheduler/build.rs b/ballista/scheduler/build.rs
index 5a3e00cc..9f2f123f 100644
--- a/ballista/scheduler/build.rs
+++ b/ballista/scheduler/build.rs
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-extern crate configure_me_codegen;
-
fn main() -> Result<(), String> {
+ #[cfg(feature = "build-binary")]
println!("cargo:rerun-if-changed=scheduler_config_spec.toml");
+ #[cfg(feature = "build-binary")]
configure_me_codegen::build_script_auto()
.map_err(|e| format!("configure_me code generation failed: {e}"))?;
diff --git a/ballista/scheduler/src/bin/main.rs
b/ballista/scheduler/src/bin/main.rs
index f6a06328..ea31810a 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -17,8 +17,8 @@
//! Ballista Rust scheduler binary.
-use anyhow::Result;
use ballista_core::config::LogRotationPolicy;
+use ballista_core::error::BallistaError;
use ballista_core::print_version;
use ballista_scheduler::cluster::BallistaCluster;
use ballista_scheduler::config::{Config, ResultExt};
@@ -27,7 +27,7 @@ use std::sync::Arc;
use std::{env, io};
use tracing_subscriber::EnvFilter;
-fn main() -> Result<()> {
+fn main() -> ballista_core::error::Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
@@ -37,7 +37,7 @@ fn main() -> Result<()> {
runtime.block_on(inner())
}
-async fn inner() -> Result<()> {
+async fn inner() -> ballista_core::error::Result<()> {
// parse options
let (opt, _remaining_args) =
Config::including_optional_config_files(&["/etc/ballista/scheduler.toml"])
@@ -85,7 +85,10 @@ async fn inner() -> Result<()> {
tracing.init();
}
let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
- let addr = addr.parse()?;
+ let addr = addr.parse().map_err(|e: std::net::AddrParseError| {
+ BallistaError::Configuration(e.to_string())
+ })?;
+
let config = opt.try_into()?;
let cluster = BallistaCluster::new_from_config(&config).await?;
start_server(cluster, addr, Arc::new(config)).await?;
diff --git a/ballista/scheduler/src/cluster/memory.rs
b/ballista/scheduler/src/cluster/memory.rs
index c9eac564..07f646b8 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -37,7 +37,7 @@ use crate::scheduler_server::{timestamp_millis,
timestamp_secs, SessionBuilder};
use crate::state::session_manager::create_datafusion_context;
use crate::state::task_manager::JobInfoCache;
use ballista_core::serde::protobuf::job_status::Status;
-use log::{error, info, warn};
+use log::{debug, error, info, warn};
use std::collections::{HashMap, HashSet};
use std::ops::DerefMut;
@@ -45,7 +45,6 @@ use ballista_core::consistent_hash::node::Node;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;
use tokio::sync::{Mutex, MutexGuard};
-use tracing::debug;
#[derive(Default)]
pub struct InMemoryClusterState {
diff --git a/ballista/scheduler/src/cluster/mod.rs
b/ballista/scheduler/src/cluster/mod.rs
index 94f86969..c54b0cea 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -16,7 +16,6 @@
// under the License.
use std::collections::{HashMap, HashSet};
-use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
@@ -69,9 +68,9 @@ impl std::str::FromStr for ClusterStorage {
ValueEnum::from_str(s, true)
}
}
-
-impl parse_arg::ParseArgFromStr for ClusterStorage {
- fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
+#[cfg(feature = "build-binary")]
+impl configure_me::parse_arg::ParseArgFromStr for ClusterStorage {
+ fn describe_type<W: std::fmt::Write>(mut writer: W) -> std::fmt::Result {
write!(writer, "The cluster storage backend for the scheduler")
}
}
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 10c6df1d..b221ecb6 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -19,12 +19,12 @@
//! Ballista scheduler specific configuration
use crate::SessionBuilder;
-use ballista_core::{config::TaskSchedulingPolicy, error::BallistaError,
ConfigProducer};
-use clap::ValueEnum;
+use ballista_core::{config::TaskSchedulingPolicy, ConfigProducer};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
-use std::{fmt, sync::Arc};
+use std::sync::Arc;
+#[cfg(feature = "build-binary")]
include!(concat!(
env!("OUT_DIR"),
"/scheduler_configure_me_config.rs"
@@ -83,57 +83,6 @@ pub struct SchedulerConfig {
pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
}
-impl std::fmt::Debug for SchedulerConfig {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("SchedulerConfig")
- .field("namespace", &self.namespace)
- .field("external_host", &self.external_host)
- .field("bind_port", &self.bind_port)
- .field("bind_host", &self.bind_host)
- .field("scheduling_policy", &self.scheduling_policy)
- .field("event_loop_buffer_size", &self.event_loop_buffer_size)
- .field("task_distribution", &self.task_distribution)
- .field(
- "finished_job_data_clean_up_interval_seconds",
- &self.finished_job_data_clean_up_interval_seconds,
- )
- .field(
- "finished_job_state_clean_up_interval_seconds",
- &self.finished_job_state_clean_up_interval_seconds,
- )
- .field(
- "advertise_flight_sql_endpoint",
- &self.advertise_flight_sql_endpoint,
- )
- .field("job_resubmit_interval_ms", &self.job_resubmit_interval_ms)
- .field("cluster_storage", &self.cluster_storage)
- .field(
- "executor_termination_grace_period",
- &self.executor_termination_grace_period,
- )
- .field(
- "scheduler_event_expected_processing_duration",
- &self.scheduler_event_expected_processing_duration,
- )
- .field(
- "grpc_server_max_decoding_message_size",
- &self.grpc_server_max_decoding_message_size,
- )
- .field(
- "grpc_server_max_encoding_message_size",
- &self.grpc_server_max_encoding_message_size,
- )
- .field("executor_timeout_seconds", &self.executor_timeout_seconds)
- .field(
- "expire_dead_executor_interval_seconds",
- &self.expire_dead_executor_interval_seconds,
- )
- .field("override_logical_codec", &self.override_logical_codec)
- .field("override_physical_codec", &self.override_physical_codec)
- .finish()
- }
-}
-
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
@@ -261,7 +210,8 @@ pub enum ClusterStorageConfig {
/// Policy of distributing tasks to available executor slots
///
/// It needs to be visible to code generated by configure_me
-#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize)]
+#[derive(Clone, Copy, Debug, serde::Deserialize)]
+#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
pub enum TaskDistribution {
/// Eagerly assign tasks to executor slots. This will assign as many task
slots per executor
/// as are currently available
@@ -276,16 +226,18 @@ pub enum TaskDistribution {
ConsistentHash,
}
+#[cfg(feature = "build-binary")]
impl std::str::FromStr for TaskDistribution {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
- ValueEnum::from_str(s, true)
+ clap::ValueEnum::from_str(s, true)
}
}
-impl parse_arg::ParseArgFromStr for TaskDistribution {
- fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
+#[cfg(feature = "build-binary")]
+impl configure_me::parse_arg::ParseArgFromStr for TaskDistribution {
+ fn describe_type<W: std::fmt::Write>(mut writer: W) -> std::fmt::Result {
write!(writer, "The executor slots policy for the scheduler")
}
}
@@ -308,9 +260,9 @@ pub enum TaskDistributionPolicy {
tolerance: usize,
},
}
-
+#[cfg(feature = "build-binary")]
impl TryFrom<Config> for SchedulerConfig {
- type Error = BallistaError;
+ type Error = ballista_core::error::BallistaError;
fn try_from(opt: Config) -> Result<Self, Self::Error> {
let task_distribution = match opt.task_distribution {
diff --git a/ballista/scheduler/src/scheduler_process.rs
b/ballista/scheduler/src/scheduler_process.rs
index 393b03b6..bf6d484f 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use anyhow::{Error, Result};
#[cfg(feature = "flight-sql")]
use arrow_flight::flight_service_server::FlightServiceServer;
+use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer;
use ballista_core::serde::{
BallistaCodec, BallistaLogicalExtensionCodec,
BallistaPhysicalExtensionCodec,
@@ -43,7 +43,7 @@ pub async fn start_server(
cluster: BallistaCluster,
addr: SocketAddr,
config: Arc<SchedulerConfig>,
-) -> Result<()> {
+) -> ballista_core::error::Result<()> {
info!(
"Ballista v{} Scheduler listening on {:?}",
BALLISTA_VERSION, addr
@@ -109,9 +109,9 @@ pub async fn start_server(
let listener = tokio::net::TcpListener::bind(&addr)
.await
- .map_err(Error::from)?;
+ .map_err(BallistaError::from)?;
axum::serve(listener, final_route)
.await
- .map_err(Error::from)
+ .map_err(BallistaError::from)
}
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index c3f3e7eb..b9b49c7f 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -359,14 +359,9 @@ mod tests {
use datafusion::test_util::scan_empty_with_partitions;
use std::sync::Arc;
use std::time::Duration;
- use tracing_subscriber::EnvFilter;
#[tokio::test]
async fn test_pending_job_metric() -> Result<()> {
- tracing_subscriber::fmt()
- .with_env_filter(EnvFilter::from_default_env())
- .init();
-
let plan = test_plan(10);
let metrics_collector = Arc::new(TestMetricsCollector::default());
diff --git a/ballista/scheduler/src/state/task_manager.rs
b/ballista/scheduler/src/state/task_manager.rs
index 2e5b76b4..cc8442f2 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -38,7 +38,7 @@ use dashmap::DashMap;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
-use log::{debug, error, info, warn};
+use log::{debug, error, info, trace, warn};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::collections::{HashMap, HashSet};
@@ -48,8 +48,6 @@ use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
-use tracing::trace;
-
type ActiveJobCache = Arc<DashMap<String, JobInfoCache>>;
// TODO move to configuration file
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 4701e9c3..941ec849 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -25,7 +25,6 @@ homepage = "https://github.com/apache/arrow-ballista"
repository = "https://github.com/apache/arrow-ballista"
license = "Apache-2.0"
publish = false
-rust-version = "1.72"
[features]
ci = []
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 65d9cd94..743ff826 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -26,7 +26,6 @@ license = "Apache-2.0"
keywords = ["arrow", "distributed", "query", "sql"]
edition = "2021"
publish = false
-rust-version = "1.72"
[[example]]
name = "standalone_sql"
@@ -34,11 +33,10 @@ path = "examples/standalone-sql.rs"
required-features = ["ballista/standalone"]
[dependencies]
-anyhow = { workspace = true }
ballista = { path = "../ballista/client", version = "0.12.0" }
ballista-core = { path = "../ballista/core", version = "0.12.0" }
-ballista-executor = { path = "../ballista/executor", version = "0.12.0" }
-ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0" }
+ballista-executor = { path = "../ballista/executor", version = "0.12.0",
default-features = false }
+ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0",
default-features = false }
datafusion = { workspace = true }
env_logger = { workspace = true }
log = { workspace = true }
diff --git a/examples/examples/custom-executor.rs
b/examples/examples/custom-executor.rs
index df3f7c24..53418212 100644
--- a/examples/examples/custom-executor.rs
+++ b/examples/examples/custom-executor.rs
@@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use anyhow::Result;
use ballista_examples::object_store::{
custom_runtime_env_with_s3_support, custom_session_config_with_s3_options,
};
-use ballista_executor::config::prelude::*;
+
use ballista_executor::executor_process::{
start_executor_process, ExecutorProcessConfig,
};
@@ -31,34 +30,23 @@ use std::sync::Arc;
/// This example demonstrates how to crate custom ballista executors.
///
#[tokio::main]
-async fn main() -> Result<()> {
+async fn main() -> ballista_core::error::Result<()> {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
.try_init();
- let (opt, _remaining_args) =
-
Config::including_optional_config_files(&["/etc/ballista/executor.toml"])
- .unwrap_or_exit();
-
- if opt.version {
- ballista_core::print_version();
- std::process::exit(0);
- }
-
- let mut config: ExecutorProcessConfig = opt.try_into().unwrap();
-
- // overriding default config producer with custom producer
- // which has required S3 configuration options
- config.override_config_producer =
- Some(Arc::new(custom_session_config_with_s3_options));
-
- // overriding default runtime producer with custom producer
- // which knows how to create S3 connections
- config.override_runtime_producer =
- Some(Arc::new(|session_config: &SessionConfig| {
+ let config: ExecutorProcessConfig = ExecutorProcessConfig {
+ // overriding default config producer with custom producer
+ // which has required S3 configuration options
+ override_config_producer:
Some(Arc::new(custom_session_config_with_s3_options)),
+ // overriding default runtime producer with custom producer
+ // which knows how to create S3 connections
+ override_runtime_producer: Some(Arc::new(|session_config:
&SessionConfig| {
custom_runtime_env_with_s3_support(session_config)
- }));
+ })),
+ ..Default::default()
+ };
start_executor_process(Arc::new(config)).await
}
diff --git a/examples/examples/custom-scheduler.rs
b/examples/examples/custom-scheduler.rs
index 30aeb3e3..9783ae28 100644
--- a/examples/examples/custom-scheduler.rs
+++ b/examples/examples/custom-scheduler.rs
@@ -15,52 +15,46 @@
// specific language governing permissions and limitations
// under the License.
-use anyhow::Result;
-use ballista_core::print_version;
+use ballista_core::error::BallistaError;
use ballista_examples::object_store::{
custom_session_config_with_s3_options,
custom_session_state_with_s3_support,
};
use ballista_scheduler::cluster::BallistaCluster;
-use ballista_scheduler::config::{Config, ResultExt, SchedulerConfig};
+use ballista_scheduler::config::SchedulerConfig;
use ballista_scheduler::scheduler_process::start_server;
use datafusion::prelude::SessionConfig;
+use std::net::AddrParseError;
use std::sync::Arc;
///
/// # Custom Ballista Scheduler
///
-/// This example demonstrates how to crate custom made ballista schedulers.
+/// This example demonstrates how to crate custom ballista schedulers.
///
#[tokio::main]
-async fn main() -> Result<()> {
+async fn main() -> ballista_core::error::Result<()> {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
.try_init();
- // parse options
- let (opt, _remaining_args) =
-
Config::including_optional_config_files(&["/etc/ballista/scheduler.toml"])
- .unwrap_or_exit();
+ let config: SchedulerConfig = SchedulerConfig {
+ // overriding default runtime producer with custom producer
+ // which knows how to create S3 connections
+ override_config_producer:
Some(Arc::new(custom_session_config_with_s3_options)),
+ // overriding default session builder, which has custom session
configuration
+ // runtime environment and session state.
+ override_session_builder: Some(Arc::new(|session_config:
SessionConfig| {
+ custom_session_state_with_s3_support(session_config)
+ })),
+ ..Default::default()
+ };
- if opt.version {
- print_version();
- std::process::exit(0);
- }
+ let addr = format!("{}:{}", config.bind_host, config.bind_port);
+ let addr = addr
+ .parse()
+ .map_err(|e: AddrParseError|
BallistaError::Configuration(e.to_string()))?;
- let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
- let addr = addr.parse()?;
- let mut config: SchedulerConfig = opt.try_into()?;
-
- // overriding default runtime producer with custom producer
- // which knows how to create S3 connections
- config.override_config_producer =
- Some(Arc::new(custom_session_config_with_s3_options));
- // overriding default session builder, which has custom session
configuration
- // runtime environment and session state.
- config.override_session_builder = Some(Arc::new(|session_config:
SessionConfig| {
- custom_session_state_with_s3_support(session_config)
- }));
let cluster = BallistaCluster::new_from_config(&config).await?;
start_server(cluster, addr, Arc::new(config)).await?;
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 747f330a..f7083822 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -25,7 +25,6 @@ description = "Apache Arrow Ballista Python Client"
readme = "README.md"
license = "Apache-2.0"
edition = "2021"
-rust-version = "1.72"
include = ["/src", "/ballista", "/LICENSE.txt", "pyproject.toml",
"Cargo.toml", "Cargo.lock"]
publish = false
@@ -33,15 +32,15 @@ publish = false
async-trait = "0.1.77"
ballista = { path = "../ballista/client", version = "0.12.0" }
ballista-core = { path = "../ballista/core", version = "0.12.0" }
-ballista-executor = { path = "../ballista/executor", version = "0.12.0" }
-ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0" }
+ballista-executor = { path = "../ballista/executor", version = "0.12.0",
default-features = false }
+ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0",
default-features = false }
datafusion = { version = "42", features = ["pyarrow", "avro"] }
datafusion-proto = { version = "42" }
datafusion-python = { version = "42" }
pyo3 = { version = "0.22", features = ["extension-module", "abi3",
"abi3-py38"] }
pyo3-log = "0.11.0"
-tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread",
"sync"] }
+tokio = { version = "1.42", features = ["macros", "rt", "rt-multi-thread",
"sync"] }
[lib]
crate-type = ["cdylib"]
diff --git a/python/src/cluster.rs b/python/src/cluster.rs
index aa4260ce..848fc488 100644
--- a/python/src/cluster.rs
+++ b/python/src/cluster.rs
@@ -128,8 +128,9 @@ impl PyScheduler {
pub fn __repr__(&self) -> String {
format!(
- "BallistaScheduler(config={:?}, listening= {})",
- self.config,
+ "BallistaScheduler(listening address={}:{}, listening= {})",
+ self.config.bind_host,
+ self.config.bind_port,
self.handle.is_some()
)
}
@@ -246,18 +247,19 @@ impl PyExecutor {
self.config.bind_host,
self.config.port,
self.config.scheduler_host,
- self.config.scheduler_port
+ self.config.scheduler_port,
),
}
}
pub fn __repr__(&self) -> String {
format!(
- "BallistaExecutor(address={}:{}, scheduler={}:{}, listening={})",
+ "BallistaExecutor(address={}:{}, scheduler={}:{},
concurrent_tasks={} listening={})",
self.config.bind_host,
self.config.port,
self.config.scheduler_host,
self.config.scheduler_port,
+ self.config.concurrent_tasks,
self.handle.is_some()
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]