This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 80c2c56c chore: dependency cleanup (#1150)
80c2c56c is described below

commit 80c2c56cf00448dac3dae3e66a80c2b4450ad17b
Author: Marko Milenković <[email protected]>
AuthorDate: Sat Dec 14 13:29:25 2024 +0000

    chore: dependency cleanup (#1150)
---
 Cargo.toml                                         |   6 +-
 ballista-cli/Cargo.toml                            |   1 -
 ballista/client/Cargo.toml                         |   8 +-
 ballista/core/Cargo.toml                           |   9 +-
 ballista/core/src/config.rs                        |  25 ++--
 ballista/core/src/diagram.rs                       | 148 +++++++++++++++++++++
 ballista/core/src/error.rs                         |  48 +------
 ballista/core/src/lib.rs                           |   5 +-
 ballista/core/src/utils.rs                         | 129 +-----------------
 ballista/executor/Cargo.toml                       |  13 +-
 ballista/executor/build.rs                         |   8 +-
 ballista/executor/src/bin/main.rs                  |   3 +-
 ballista/executor/src/config.rs                    |   1 +
 ballista/executor/src/execution_loop.rs            |  20 ++-
 ballista/executor/src/executor.rs                  |   4 +-
 ballista/executor/src/executor_process.rs          |  39 ++++--
 ballista/executor/src/flight_service.rs            |  12 +-
 ballista/executor/src/lib.rs                       |   1 +
 ballista/scheduler/Cargo.toml                      |  22 ++-
 ballista/scheduler/build.rs                        |   4 +-
 ballista/scheduler/src/bin/main.rs                 |  11 +-
 ballista/scheduler/src/cluster/memory.rs           |   3 +-
 ballista/scheduler/src/cluster/mod.rs              |   7 +-
 ballista/scheduler/src/config.rs                   |  72 ++--------
 ballista/scheduler/src/scheduler_process.rs        |   8 +-
 .../src/scheduler_server/query_stage_scheduler.rs  |   5 -
 ballista/scheduler/src/state/task_manager.rs       |   4 +-
 benchmarks/Cargo.toml                              |   1 -
 examples/Cargo.toml                                |   6 +-
 examples/examples/custom-executor.rs               |  36 ++---
 examples/examples/custom-scheduler.rs              |  46 +++----
 python/Cargo.toml                                  |   7 +-
 python/src/cluster.rs                              |  10 +-
 33 files changed, 320 insertions(+), 402 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index a68222a2..f9206458 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,7 +21,6 @@ members = ["ballista-cli", "ballista/client", 
"ballista/core", "ballista/executo
 resolver = "2"
 
 [workspace.dependencies]
-anyhow = "1"
 arrow = { version = "53", features = ["ipc_compression"] }
 arrow-flight = { version = "53", features = ["flight-sql-experimental"] }
 clap = { version = "4.5", features = ["derive", "cargo"] }
@@ -40,9 +39,9 @@ tonic-build = { version = "0.12", default-features = false, 
features = [
     "transport",
     "prost"
 ] }
-tracing = "0.1.36"
+tracing = "0.1"
 tracing-appender = "0.2.2"
-tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
+tracing-subscriber = { version = "0.3", features = ["env-filter"] }
 ctor = { version = "0.2" }
 mimalloc = { version = "0.1" }
 
@@ -58,7 +57,6 @@ dashmap = { version = "6.1" }
 async-trait = { version = "0.1.4" }
 serde = { version = "1.0" }
 tokio-stream = { version = "0.1" }
-parse_arg = { version = "0.1" }
 url = { version = "2.5" }
 
 # cargo build --profile release-lto
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 2f1ddeb0..9b527e56 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -25,7 +25,6 @@ keywords = ["ballista", "cli"]
 license = "Apache-2.0"
 homepage = "https://github.com/apache/arrow-ballista";
 repository = "https://github.com/apache/arrow-ballista";
-rust-version = "1.72"
 readme = "README.md"
 
 [dependencies]
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index 9614412f..e462367a 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -25,7 +25,6 @@ repository = "https://github.com/apache/arrow-ballista";
 readme = "README.md"
 authors = ["Apache DataFusion <[email protected]>"]
 edition = "2021"
-rust-version = "1.72"
 
 [dependencies]
 async-trait = { workspace = true }
@@ -33,11 +32,8 @@ ballista-core = { path = "../core", version = "0.12.0" }
 ballista-executor = { path = "../executor", version = "0.12.0", optional = 
true }
 ballista-scheduler = { path = "../scheduler", version = "0.12.0", optional = 
true }
 datafusion = { workspace = true }
-datafusion-proto = { workspace = true }
-futures = { workspace = true }
 log = { workspace = true }
-parking_lot = { workspace = true }
-tempfile = { workspace = true }
+
 tokio = { workspace = true }
 url = { workspace = true }
 
@@ -45,8 +41,10 @@ url = { workspace = true }
 ballista-executor = { path = "../executor", version = "0.12.0" }
 ballista-scheduler = { path = "../scheduler", version = "0.12.0" }
 ctor = { workspace = true }
+datafusion-proto = { workspace = true }
 env_logger = { workspace = true }
 rstest = { version = "0.23" }
+tempfile = { workspace = true }
 tonic = { workspace = true }
 
 [features]
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 80a3d102..1bf88858 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -34,25 +34,24 @@ exclude = ["*.proto"]
 rustc-args = ["--cfg", "docsrs"]
 
 [features]
+build-binary = ["configure_me", "clap"]
 docsrs = []
 # Used for testing ONLY: causes all values to hash to the same value (test for 
collisions)
 force_hash_collisions = ["datafusion/force_hash_collisions"]
 
-
 [dependencies]
 arrow-flight = { workspace = true }
 async-trait = { workspace = true }
 chrono = { version = "0.4", default-features = false }
-clap = { workspace = true }
+clap = { workspace = true, optional = true }
+configure_me = { workspace = true, optional = true }
 datafusion = { workspace = true }
 datafusion-proto = { workspace = true }
 datafusion-proto-common = { workspace = true }
 futures = { workspace = true }
-
 itertools = "0.13"
 log = { workspace = true }
 md-5 = { version = "^0.10.0" }
-parse_arg = { workspace = true }
 prost = { workspace = true }
 prost-types = { workspace = true }
 rand = { workspace = true }
@@ -66,5 +65,5 @@ url = { workspace = true }
 tempfile = { workspace = true }
 
 [build-dependencies]
-rustc_version = "0.4.0"
+rustc_version = "0.4.1"
 tonic-build = { workspace = true }
diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index e00cd115..cb7f7c5d 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -18,8 +18,6 @@
 
 //! Ballista configuration
 
-use clap::ValueEnum;
-use core::fmt;
 use std::collections::HashMap;
 use std::result;
 
@@ -252,30 +250,33 @@ impl datafusion::config::ConfigExtension for 
BallistaConfig {
 
 // an enum used to configure the scheduler policy
 // needs to be visible to code generated by configure_me
-#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)]
+#[derive(Clone, Copy, Debug, serde::Deserialize, Default)]
+#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
 pub enum TaskSchedulingPolicy {
     #[default]
     PullStaged,
     PushStaged,
 }
 
+#[cfg(feature = "build-binary")]
 impl std::str::FromStr for TaskSchedulingPolicy {
     type Err = String;
 
     fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
-        ValueEnum::from_str(s, true)
+        clap::ValueEnum::from_str(s, true)
     }
 }
-
-impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy {
-    fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
+#[cfg(feature = "build-binary")]
+impl configure_me::parse_arg::ParseArgFromStr for TaskSchedulingPolicy {
+    fn describe_type<W: core::fmt::Write>(mut writer: W) -> core::fmt::Result {
         write!(writer, "The scheduler policy for the scheduler")
     }
 }
 
 // an enum used to configure the log rolling policy
 // needs to be visible to code generated by configure_me
-#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)]
+#[derive(Clone, Copy, Debug, serde::Deserialize, Default)]
+#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
 pub enum LogRotationPolicy {
     Minutely,
     Hourly,
@@ -284,16 +285,18 @@ pub enum LogRotationPolicy {
     Never,
 }
 
+#[cfg(feature = "build-binary")]
 impl std::str::FromStr for LogRotationPolicy {
     type Err = String;
 
     fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
-        ValueEnum::from_str(s, true)
+        clap::ValueEnum::from_str(s, true)
     }
 }
 
-impl parse_arg::ParseArgFromStr for LogRotationPolicy {
-    fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
+#[cfg(feature = "build-binary")]
+impl configure_me::parse_arg::ParseArgFromStr for LogRotationPolicy {
+    fn describe_type<W: core::fmt::Write>(mut writer: W) -> core::fmt::Result {
         write!(writer, "The log rotation policy")
     }
 }
diff --git a/ballista/core/src/diagram.rs b/ballista/core/src/diagram.rs
new file mode 100644
index 00000000..9ef0da98
--- /dev/null
+++ b/ballista/core/src/diagram.rs
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Result;
+use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
+
+use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
+use datafusion::physical_plan::aggregates::AggregateExec;
+use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion::physical_plan::filter::FilterExec;
+use datafusion::physical_plan::joins::HashJoinExec;
+use datafusion::physical_plan::projection::ProjectionExec;
+use datafusion::physical_plan::sorts::sort::SortExec;
+use datafusion::physical_plan::ExecutionPlan;
+use std::fs::File;
+use std::io::{BufWriter, Write};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+
+pub fn produce_diagram(filename: &str, stages: &[Arc<ShuffleWriterExec>]) -> 
Result<()> {
+    let write_file = File::create(filename)?;
+    let mut w = BufWriter::new(&write_file);
+    writeln!(w, "digraph G {{")?;
+
+    // draw stages and entities
+    for stage in stages {
+        writeln!(w, "\tsubgraph cluster{} {{", stage.stage_id())?;
+        writeln!(w, "\t\tlabel = \"Stage {}\";", stage.stage_id())?;
+        let mut id = AtomicUsize::new(0);
+        build_exec_plan_diagram(
+            &mut w,
+            stage.children()[0].as_ref(),
+            stage.stage_id(),
+            &mut id,
+            true,
+        )?;
+        writeln!(w, "\t}}")?;
+    }
+
+    // draw relationships
+    for stage in stages {
+        let mut id = AtomicUsize::new(0);
+        build_exec_plan_diagram(
+            &mut w,
+            stage.children()[0].as_ref(),
+            stage.stage_id(),
+            &mut id,
+            false,
+        )?;
+    }
+
+    write!(w, "}}")?;
+    Ok(())
+}
+
+fn build_exec_plan_diagram(
+    w: &mut BufWriter<&File>,
+    plan: &dyn ExecutionPlan,
+    stage_id: usize,
+    id: &mut AtomicUsize,
+    draw_entity: bool,
+) -> Result<usize> {
+    let operator_str = if 
plan.as_any().downcast_ref::<AggregateExec>().is_some() {
+        "AggregateExec"
+    } else if plan.as_any().downcast_ref::<SortExec>().is_some() {
+        "SortExec"
+    } else if plan.as_any().downcast_ref::<ProjectionExec>().is_some() {
+        "ProjectionExec"
+    } else if plan.as_any().downcast_ref::<HashJoinExec>().is_some() {
+        "HashJoinExec"
+    } else if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
+        "ParquetExec"
+    } else if plan.as_any().downcast_ref::<CsvExec>().is_some() {
+        "CsvExec"
+    } else if plan.as_any().downcast_ref::<FilterExec>().is_some() {
+        "FilterExec"
+    } else if plan.as_any().downcast_ref::<ShuffleWriterExec>().is_some() {
+        "ShuffleWriterExec"
+    } else if plan
+        .as_any()
+        .downcast_ref::<UnresolvedShuffleExec>()
+        .is_some()
+    {
+        "UnresolvedShuffleExec"
+    } else if plan
+        .as_any()
+        .downcast_ref::<CoalesceBatchesExec>()
+        .is_some()
+    {
+        "CoalesceBatchesExec"
+    } else if plan
+        .as_any()
+        .downcast_ref::<CoalescePartitionsExec>()
+        .is_some()
+    {
+        "CoalescePartitionsExec"
+    } else {
+        println!("Unknown: {plan:?}");
+        "Unknown"
+    };
+
+    let node_id = id.load(Ordering::SeqCst);
+    id.store(node_id + 1, Ordering::SeqCst);
+
+    if draw_entity {
+        writeln!(
+            w,
+            "\t\tstage_{stage_id}_exec_{node_id} [shape=box, 
label=\"{operator_str}\"];"
+        )?;
+    }
+    for child in plan.children() {
+        if let Some(shuffle) = 
child.as_any().downcast_ref::<UnresolvedShuffleExec>() {
+            if !draw_entity {
+                writeln!(
+                    w,
+                    "\tstage_{}_exec_1 -> stage_{}_exec_{};",
+                    shuffle.stage_id, stage_id, node_id
+                )?;
+            }
+        } else {
+            // relationships within same entity
+            let child_id =
+                build_exec_plan_diagram(w, child.as_ref(), stage_id, id, 
draw_entity)?;
+            if draw_entity {
+                writeln!(
+                    w,
+                    "\t\tstage_{stage_id}_exec_{child_id} -> 
stage_{stage_id}_exec_{node_id};"
+                )?;
+            }
+        }
+    }
+    Ok(node_id)
+}
diff --git a/ballista/core/src/error.rs b/ballista/core/src/error.rs
index cbdd90a7..05a706cc 100644
--- a/ballista/core/src/error.rs
+++ b/ballista/core/src/error.rs
@@ -37,15 +37,11 @@ pub enum BallistaError {
     NotImplemented(String),
     General(String),
     Internal(String),
+    Configuration(String),
     ArrowError(ArrowError),
     DataFusionError(DataFusionError),
     SqlError(parser::ParserError),
     IoError(io::Error),
-    // ReqwestError(reqwest::Error),
-    // HttpError(http::Error),
-    // KubeAPIError(kube::error::Error),
-    // KubeAPIRequestError(k8s_openapi::RequestError),
-    // KubeAPIResponseError(k8s_openapi::ResponseError),
     TonicError(tonic::transport::Error),
     GrpcError(tonic::Status),
     GrpcConnectionError(String),
@@ -112,36 +108,6 @@ impl From<io::Error> for BallistaError {
     }
 }
 
-// impl From<reqwest::Error> for BallistaError {
-//     fn from(e: reqwest::Error) -> Self {
-//         BallistaError::ReqwestError(e)
-//     }
-// }
-//
-// impl From<http::Error> for BallistaError {
-//     fn from(e: http::Error) -> Self {
-//         BallistaError::HttpError(e)
-//     }
-// }
-
-// impl From<kube::error::Error> for BallistaError {
-//     fn from(e: kube::error::Error) -> Self {
-//         BallistaError::KubeAPIError(e)
-//     }
-// }
-
-// impl From<k8s_openapi::RequestError> for BallistaError {
-//     fn from(e: k8s_openapi::RequestError) -> Self {
-//         BallistaError::KubeAPIRequestError(e)
-//     }
-// }
-
-// impl From<k8s_openapi::ResponseError> for BallistaError {
-//     fn from(e: k8s_openapi::ResponseError) -> Self {
-//         BallistaError::KubeAPIResponseError(e)
-//     }
-// }
-
 impl From<tonic::transport::Error> for BallistaError {
     fn from(e: tonic::transport::Error) -> Self {
         BallistaError::TonicError(e)
@@ -191,15 +157,6 @@ impl Display for BallistaError {
             }
             BallistaError::SqlError(ref desc) => write!(f, "SQL error: 
{desc}"),
             BallistaError::IoError(ref desc) => write!(f, "IO error: {desc}"),
-            // BallistaError::ReqwestError(ref desc) => write!(f, "Reqwest 
error: {}", desc),
-            // BallistaError::HttpError(ref desc) => write!(f, "HTTP error: 
{}", desc),
-            // BallistaError::KubeAPIError(ref desc) => write!(f, "Kube API 
error: {}", desc),
-            // BallistaError::KubeAPIRequestError(ref desc) => {
-            //     write!(f, "KubeAPI request error: {}", desc)
-            // }
-            // BallistaError::KubeAPIResponseError(ref desc) => {
-            //     write!(f, "KubeAPI response error: {}", desc)
-            // }
             BallistaError::TonicError(desc) => write!(f, "Tonic error: 
{desc}"),
             BallistaError::GrpcError(desc) => write!(f, "Grpc error: {desc}"),
             BallistaError::GrpcConnectionError(desc) => {
@@ -220,6 +177,9 @@ impl Display for BallistaError {
                 )
             }
             BallistaError::Cancelled => write!(f, "Task cancelled"),
+            BallistaError::Configuration(desc) => {
+                write!(f, "Configuration error: {desc}")
+            }
         }
     }
 }
diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs
index c2d92d35..7864d56e 100644
--- a/ballista/core/src/lib.rs
+++ b/ballista/core/src/lib.rs
@@ -29,15 +29,14 @@ pub fn print_version() {
 pub mod client;
 pub mod config;
 pub mod consistent_hash;
+pub mod diagram;
 pub mod error;
 pub mod event_loop;
 pub mod execution_plans;
 pub mod extension;
 pub mod registry;
-pub mod utils;
-
-#[macro_use]
 pub mod serde;
+pub mod utils;
 
 ///
 /// [RuntimeProducer] is a factory which creates runtime [RuntimeEnv]
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 1506c2bb..14eeb9a2 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -17,9 +17,7 @@
 
 use crate::config::BallistaConfig;
 use crate::error::{BallistaError, Result};
-use crate::execution_plans::{
-    DistributedQueryExec, ShuffleWriterExec, UnresolvedShuffleExec,
-};
+use crate::execution_plans::DistributedQueryExec;
 
 use crate::extension::SessionConfigExt;
 use crate::serde::scheduler::PartitionStats;
@@ -32,29 +30,19 @@ use datafusion::arrow::ipc::writer::StreamWriter;
 use datafusion::arrow::ipc::CompressionType;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor};
-use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
 use datafusion::error::DataFusionError;
 use datafusion::execution::context::{QueryPlanner, SessionConfig, 
SessionState};
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion::execution::session_state::SessionStateBuilder;
 use datafusion::logical_expr::{DdlStatement, LogicalPlan, TableScan};
-use datafusion::physical_plan::aggregates::AggregateExec;
-use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
-use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::empty::EmptyExec;
-use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::joins::HashJoinExec;
 use datafusion::physical_plan::metrics::MetricsSet;
-use datafusion::physical_plan::projection::ProjectionExec;
-use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream};
 use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
 use datafusion_proto::logical_plan::{AsLogicalPlan, LogicalExtensionCodec};
 use futures::StreamExt;
 use log::error;
-use std::io::{BufWriter, Write};
 use std::marker::PhantomData;
-use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
 use std::{fs::File, pin::Pin};
@@ -129,121 +117,6 @@ pub async fn collect_stream(
     Ok(batches)
 }
 
-pub fn produce_diagram(filename: &str, stages: &[Arc<ShuffleWriterExec>]) -> 
Result<()> {
-    let write_file = File::create(filename)?;
-    let mut w = BufWriter::new(&write_file);
-    writeln!(w, "digraph G {{")?;
-
-    // draw stages and entities
-    for stage in stages {
-        writeln!(w, "\tsubgraph cluster{} {{", stage.stage_id())?;
-        writeln!(w, "\t\tlabel = \"Stage {}\";", stage.stage_id())?;
-        let mut id = AtomicUsize::new(0);
-        build_exec_plan_diagram(
-            &mut w,
-            stage.children()[0].as_ref(),
-            stage.stage_id(),
-            &mut id,
-            true,
-        )?;
-        writeln!(w, "\t}}")?;
-    }
-
-    // draw relationships
-    for stage in stages {
-        let mut id = AtomicUsize::new(0);
-        build_exec_plan_diagram(
-            &mut w,
-            stage.children()[0].as_ref(),
-            stage.stage_id(),
-            &mut id,
-            false,
-        )?;
-    }
-
-    write!(w, "}}")?;
-    Ok(())
-}
-
-fn build_exec_plan_diagram(
-    w: &mut BufWriter<&File>,
-    plan: &dyn ExecutionPlan,
-    stage_id: usize,
-    id: &mut AtomicUsize,
-    draw_entity: bool,
-) -> Result<usize> {
-    let operator_str = if 
plan.as_any().downcast_ref::<AggregateExec>().is_some() {
-        "AggregateExec"
-    } else if plan.as_any().downcast_ref::<SortExec>().is_some() {
-        "SortExec"
-    } else if plan.as_any().downcast_ref::<ProjectionExec>().is_some() {
-        "ProjectionExec"
-    } else if plan.as_any().downcast_ref::<HashJoinExec>().is_some() {
-        "HashJoinExec"
-    } else if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
-        "ParquetExec"
-    } else if plan.as_any().downcast_ref::<CsvExec>().is_some() {
-        "CsvExec"
-    } else if plan.as_any().downcast_ref::<FilterExec>().is_some() {
-        "FilterExec"
-    } else if plan.as_any().downcast_ref::<ShuffleWriterExec>().is_some() {
-        "ShuffleWriterExec"
-    } else if plan
-        .as_any()
-        .downcast_ref::<UnresolvedShuffleExec>()
-        .is_some()
-    {
-        "UnresolvedShuffleExec"
-    } else if plan
-        .as_any()
-        .downcast_ref::<CoalesceBatchesExec>()
-        .is_some()
-    {
-        "CoalesceBatchesExec"
-    } else if plan
-        .as_any()
-        .downcast_ref::<CoalescePartitionsExec>()
-        .is_some()
-    {
-        "CoalescePartitionsExec"
-    } else {
-        println!("Unknown: {plan:?}");
-        "Unknown"
-    };
-
-    let node_id = id.load(Ordering::SeqCst);
-    id.store(node_id + 1, Ordering::SeqCst);
-
-    if draw_entity {
-        writeln!(
-            w,
-            "\t\tstage_{stage_id}_exec_{node_id} [shape=box, 
label=\"{operator_str}\"];"
-        )?;
-    }
-    for child in plan.children() {
-        if let Some(shuffle) = 
child.as_any().downcast_ref::<UnresolvedShuffleExec>() {
-            if !draw_entity {
-                writeln!(
-                    w,
-                    "\tstage_{}_exec_1 -> stage_{}_exec_{};",
-                    shuffle.stage_id, stage_id, node_id
-                )?;
-            }
-        } else {
-            // relationships within same entity
-            let child_id =
-                build_exec_plan_diagram(w, child.as_ref(), stage_id, id, 
draw_entity)?;
-            if draw_entity {
-                writeln!(
-                    w,
-                    "\t\tstage_{stage_id}_exec_{child_id} -> 
stage_{stage_id}_exec_{node_id};"
-                )?;
-            }
-        }
-    }
-    Ok(node_id)
-}
-
 pub struct BallistaQueryPlanner<T: AsLogicalPlan> {
     scheduler_url: String,
     config: BallistaConfig,
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index abe256eb..6a2dfa61 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -32,17 +32,18 @@ executor = "executor_config_spec.toml"
 [[bin]]
 name = "ballista-executor"
 path = "src/bin/main.rs"
+required-features = ["build-binary"]
 
 [features]
-default = ["mimalloc"]
+build-binary = ["configure_me", "tracing-subscriber", "tracing-appender", 
"tracing", "ballista-core/build-binary"]
+default = ["build-binary", "mimalloc"]
 
 [dependencies]
-anyhow = { workspace = true }
 arrow = { workspace = true }
 arrow-flight = { workspace = true }
 async-trait = { workspace = true }
 ballista-core = { path = "../core", version = "0.12.0" }
-configure_me = { workspace = true }
+configure_me = { workspace = true, optional = true }
 dashmap = { workspace = true }
 datafusion = { workspace = true }
 datafusion-proto = { workspace = true }
@@ -60,9 +61,9 @@ tokio = { workspace = true, features = [
 ] }
 tokio-stream = { workspace = true, features = ["net"] }
 tonic = { workspace = true }
-tracing = { workspace = true }
-tracing-appender = { workspace = true }
-tracing-subscriber = { workspace = true }
+tracing = { workspace = true, optional = true }
+tracing-appender = { workspace = true, optional = true }
+tracing-subscriber = { workspace = true, optional = true }
 uuid = { workspace = true }
 
 [dev-dependencies]
diff --git a/ballista/executor/build.rs b/ballista/executor/build.rs
index 7d2b9b87..21ce2d8f 100644
--- a/ballista/executor/build.rs
+++ b/ballista/executor/build.rs
@@ -15,10 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-extern crate configure_me_codegen;
-
 fn main() -> Result<(), String> {
+    #[cfg(feature = "build-binary")]
     println!("cargo:rerun-if-changed=executor_config_spec.toml");
+    #[cfg(feature = "build-binary")]
     configure_me_codegen::build_script_auto()
-        .map_err(|e| format!("configure_me code generation failed: {e}"))
+        .map_err(|e| format!("configure_me code generation failed: {e}"))?;
+
+    Ok(())
 }
diff --git a/ballista/executor/src/bin/main.rs 
b/ballista/executor/src/bin/main.rs
index 2ab1a90c..18abb996 100644
--- a/ballista/executor/src/bin/main.rs
+++ b/ballista/executor/src/bin/main.rs
@@ -17,7 +17,6 @@
 
 //! Ballista Rust executor binary.
 
-use anyhow::Result;
 use ballista_core::config::LogRotationPolicy;
 use ballista_core::print_version;
 use ballista_executor::config::prelude::*;
@@ -33,7 +32,7 @@ use tracing_subscriber::EnvFilter;
 static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 #[tokio::main]
-async fn main() -> Result<()> {
+async fn main() -> ballista_core::error::Result<()> {
     // parse command-line arguments
     let (opt, _remaining_args) =
         
Config::including_optional_config_files(&["/etc/ballista/executor.toml"])
diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs
index 65fa9d46..91b54732 100644
--- a/ballista/executor/src/config.rs
+++ b/ballista/executor/src/config.rs
@@ -21,6 +21,7 @@ use crate::executor_process::ExecutorProcessConfig;
 
 // Ideally we would use the include_config macro from configure_me, but then 
we cannot use
 // #[allow(clippy::all)] to silence clippy warnings from the generated code
+
 include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs"));
 
 impl TryFrom<Config> for ExecutorProcessConfig {
diff --git a/ballista/executor/src/execution_loop.rs 
b/ballista/executor/src/execution_loop.rs
index 649b366b..2094425d 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -77,16 +77,14 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionPlan>
         let task_status: Vec<TaskStatus> =
             sample_tasks_status(&mut task_status_receiver).await;
 
-        let poll_work_result: anyhow::Result<
-            tonic::Response<PollWorkResult>,
-            tonic::Status,
-        > = scheduler
-            .poll_work(PollWorkParams {
-                metadata: Some(executor.metadata.clone()),
-                num_free_slots: available_task_slots.available_permits() as 
u32,
-                task_status,
-            })
-            .await;
+        let poll_work_result: Result<tonic::Response<PollWorkResult>, 
tonic::Status> =
+            scheduler
+                .poll_work(PollWorkParams {
+                    metadata: Some(executor.metadata.clone()),
+                    num_free_slots: available_task_slots.available_permits() 
as u32,
+                    task_status,
+                })
+                .await;
 
         match poll_work_result {
             Ok(result) => {
@@ -274,7 +272,7 @@ async fn sample_tasks_status(
 
     loop {
         match task_status_receiver.try_recv() {
-            anyhow::Result::Ok(status) => {
+            Result::Ok(status) => {
                 task_status.push(status);
             }
             Err(TryRecvError::Empty) => {
diff --git a/ballista/executor/src/executor.rs 
b/ballista/executor/src/executor.rs
index 1ebf3e56..1b029e17 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -215,13 +215,13 @@ impl Executor {
 mod test {
     use crate::execution_engine::DefaultQueryStageExec;
     use crate::executor::Executor;
-    use arrow::datatypes::{Schema, SchemaRef};
-    use arrow::record_batch::RecordBatch;
     use ballista_core::execution_plans::ShuffleWriterExec;
     use ballista_core::serde::protobuf::ExecutorRegistration;
     use ballista_core::serde::scheduler::PartitionId;
     use ballista_core::utils::default_config_producer;
     use ballista_core::RuntimeProducer;
+    use datafusion::arrow::datatypes::{Schema, SchemaRef};
+    use datafusion::arrow::record_batch::RecordBatch;
     use datafusion::error::{DataFusionError, Result};
     use datafusion::execution::context::TaskContext;
 
diff --git a/ballista/executor/src/executor_process.rs 
b/ballista/executor/src/executor_process.rs
index ed690288..e350f391 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -22,7 +22,6 @@ use std::sync::atomic::Ordering;
 use std::sync::Arc;
 use std::time::{Duration, Instant, UNIX_EPOCH};
 
-use anyhow::{Context, Result};
 use arrow_flight::flight_service_server::FlightServiceServer;
 use ballista_core::registry::BallistaFunctionRegistry;
 use datafusion_proto::logical_plan::LogicalExtensionCodec;
@@ -148,11 +147,13 @@ impl Default for ExecutorProcessConfig {
     }
 }
 
-pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> 
Result<()> {
+pub async fn start_executor_process(
+    opt: Arc<ExecutorProcessConfig>,
+) -> ballista_core::error::Result<()> {
     let addr = format!("{}:{}", opt.bind_host, opt.port);
-    let addr = addr
-        .parse()
-        .with_context(|| format!("Could not parse address: {addr}"))?;
+    let addr = addr.parse().map_err(|e: std::net::AddrParseError| {
+        BallistaError::Configuration(e.to_string())
+    })?;
 
     let scheduler_host = opt.scheduler_host.clone();
     let scheduler_port = opt.scheduler_port;
@@ -237,7 +238,11 @@ pub async fn start_executor_process(opt: 
Arc<ExecutorProcessConfig>) -> Result<(
     let connection = if connect_timeout == 0 {
         create_grpc_client_connection(scheduler_url)
             .await
-            .context("Could not connect to scheduler")
+            .map_err(|_| {
+                BallistaError::GrpcConnectionError(
+                    "Could not connect to scheduler".to_string(),
+                )
+            })
     } else {
         // this feature was added to support docker-compose so that we can 
have the executor
         // wait for the scheduler to start, or at least run for 10 seconds 
before failing so
@@ -249,8 +254,11 @@ pub async fn start_executor_process(opt: 
Arc<ExecutorProcessConfig>) -> Result<(
         {
             match create_grpc_client_connection(scheduler_url.clone())
                 .await
-                .context("Could not connect to scheduler")
-            {
+                .map_err(|_| {
+                    BallistaError::GrpcConnectionError(
+                        "Could not connect to scheduler".to_string(),
+                    )
+                }) {
                 Ok(conn) => {
                     info!("Connected to scheduler at {}", scheduler_url);
                     x = Some(conn);
@@ -268,8 +276,7 @@ pub async fn start_executor_process(opt: 
Arc<ExecutorProcessConfig>) -> Result<(
             Some(conn) => Ok(conn),
             _ => Err(BallistaError::General(format!(
                 "Timed out attempting to connect to scheduler at 
{scheduler_url}"
-            ))
-            .into()),
+            ))),
         }
     }?;
 
@@ -489,7 +496,10 @@ async fn check_services(
 
 /// This function will be scheduled periodically for cleanup the job shuffle 
data left on the executor.
 /// Only directories will be checked cleaned.
-async fn clean_shuffle_data_loop(work_dir: &str, seconds: u64) -> Result<()> {
+async fn clean_shuffle_data_loop(
+    work_dir: &str,
+    seconds: u64,
+) -> ballista_core::error::Result<()> {
     let mut dir = fs::read_dir(work_dir).await?;
     let mut to_deleted = Vec::new();
     while let Some(child) = dir.next_entry().await? {
@@ -527,7 +537,7 @@ async fn clean_shuffle_data_loop(work_dir: &str, seconds: 
u64) -> Result<()> {
 }
 
 /// This function will clean up all shuffle data on this executor
-async fn clean_all_shuffle_data(work_dir: &str) -> Result<()> {
+async fn clean_all_shuffle_data(work_dir: &str) -> 
ballista_core::error::Result<()> {
     let mut dir = fs::read_dir(work_dir).await?;
     let mut to_deleted = Vec::new();
     while let Some(child) = dir.next_entry().await? {
@@ -552,7 +562,10 @@ async fn clean_all_shuffle_data(work_dir: &str) -> 
Result<()> {
 
 /// Determines if a directory contains files newer than the cutoff time.
 /// If return true, it means the directory contains files newer than the 
cutoff time. It satisfy the ttl and should not be deleted.
-pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds: u64) -> Result<bool> {
+pub async fn satisfy_dir_ttl(
+    dir: DirEntry,
+    ttl_seconds: u64,
+) -> ballista_core::error::Result<bool> {
     let cutoff = get_time_before(ttl_seconds);
 
     let mut to_check = vec![dir];
diff --git a/ballista/executor/src/flight_service.rs 
b/ballista/executor/src/flight_service.rs
index a96a752c..939b5a8f 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -17,24 +17,24 @@
 
 //! Implementation of the Apache Arrow Flight protocol that wraps an executor.
 
-use arrow::ipc::reader::StreamReader;
+use datafusion::arrow::ipc::reader::StreamReader;
 use std::convert::TryFrom;
 use std::fs::File;
 use std::pin::Pin;
 
-use arrow::ipc::CompressionType;
 use arrow_flight::encode::FlightDataEncoderBuilder;
 use arrow_flight::error::FlightError;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
 use ballista_core::serde::scheduler::Action as BallistaAction;
+use datafusion::arrow::ipc::CompressionType;
 
-use arrow::ipc::writer::IpcWriteOptions;
 use arrow_flight::{
     flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
     FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, 
HandshakeResponse,
     PollInfo, PutResult, SchemaResult, Ticket,
 };
+use datafusion::arrow::ipc::writer::IpcWriteOptions;
 use datafusion::arrow::{error::ArrowError, record_batch::RecordBatch};
 use futures::{Stream, StreamExt, TryStreamExt};
 use log::{debug, info};
@@ -45,7 +45,6 @@ use tokio::{sync::mpsc::Sender, task};
 use tokio_stream::wrappers::ReceiverStream;
 use tonic::metadata::MetadataValue;
 use tonic::{Request, Response, Status, Streaming};
-use tracing::warn;
 
 /// Service implementing the Apache Arrow Flight Protocol
 #[derive(Clone)]
@@ -103,7 +102,10 @@ impl FlightService for BallistaFlightService {
                 let schema = reader.schema();
                 task::spawn_blocking(move || {
                     if let Err(e) = read_partition(reader, tx) {
-                        warn!(error = %e, "error streaming shuffle partition");
+                        log::warn!(
+                            "error streaming shuffle partition: {}",
+                            e.to_string()
+                        );
                     }
                 });
 
diff --git a/ballista/executor/src/lib.rs b/ballista/executor/src/lib.rs
index f0284cbd..23e68f85 100644
--- a/ballista/executor/src/lib.rs
+++ b/ballista/executor/src/lib.rs
@@ -18,6 +18,7 @@
 #![doc = include_str!("../README.md")]
 
 pub mod collect;
+#[cfg(feature = "build-binary")]
 pub mod config;
 pub mod execution_engine;
 pub mod execution_loop;
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index ad3e0963..fc3ca09a 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -32,34 +32,33 @@ scheduler = "scheduler_config_spec.toml"
 [[bin]]
 name = "ballista-scheduler"
 path = "src/bin/main.rs"
+required-features = ["build-binary"]
 
 [features]
-default = []
-flight-sql = []
+build-binary = ["configure_me", "clap", "tracing-subscriber", 
"tracing-appender", "tracing", "ballista-core/build-binary"]
+default = ["build-binary"]
+flight-sql = ["base64"]
 keda-scaler = []
 prometheus-metrics = ["prometheus", "once_cell"]
 rest-api = []
 
 [dependencies]
-anyhow = { workspace = true }
 arrow-flight = { workspace = true }
 async-trait = { workspace = true }
 axum = "0.7.7"
 ballista-core = { path = "../core", version = "0.12.0" }
-base64 = { version = "0.22" }
-clap = { workspace = true }
-configure_me = { workspace = true }
+base64 = { version = "0.22", optional = true }
+clap = { workspace = true, optional = true }
+configure_me = { workspace = true, optional = true }
 dashmap = { workspace = true }
 datafusion = { workspace = true }
 datafusion-proto = { workspace = true }
 futures = { workspace = true }
-graphviz-rust = "0.9.0"
 http = "1.1"
 log = { workspace = true }
 object_store = { workspace = true }
 once_cell = { version = "1.16.0", optional = true }
 parking_lot = { workspace = true }
-parse_arg = { workspace = true }
 prometheus = { version = "0.13", features = ["process"], optional = true }
 prost = { workspace = true }
 prost-types = { workspace = true }
@@ -68,13 +67,12 @@ serde = { workspace = true, features = ["derive"] }
 tokio = { workspace = true, features = ["full"] }
 tokio-stream = { workspace = true, features = ["net"] }
 tonic = { workspace = true }
-tracing = { workspace = true }
-tracing-appender = { workspace = true }
-tracing-subscriber = { workspace = true }
+tracing = { workspace = true, optional = true }
+tracing-appender = { workspace = true, optional = true }
+tracing-subscriber = { workspace = true, optional = true }
 uuid = { workspace = true }
 
 [dev-dependencies]
-ballista-core = { path = "../core", version = "0.12.0" }
 
 [build-dependencies]
 configure_me_codegen = { workspace = true }
diff --git a/ballista/scheduler/build.rs b/ballista/scheduler/build.rs
index 5a3e00cc..9f2f123f 100644
--- a/ballista/scheduler/build.rs
+++ b/ballista/scheduler/build.rs
@@ -15,10 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-extern crate configure_me_codegen;
-
 fn main() -> Result<(), String> {
+    #[cfg(feature = "build-binary")]
     println!("cargo:rerun-if-changed=scheduler_config_spec.toml");
+    #[cfg(feature = "build-binary")]
     configure_me_codegen::build_script_auto()
         .map_err(|e| format!("configure_me code generation failed: {e}"))?;
 
diff --git a/ballista/scheduler/src/bin/main.rs 
b/ballista/scheduler/src/bin/main.rs
index f6a06328..ea31810a 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -17,8 +17,8 @@
 
 //! Ballista Rust scheduler binary.
 
-use anyhow::Result;
 use ballista_core::config::LogRotationPolicy;
+use ballista_core::error::BallistaError;
 use ballista_core::print_version;
 use ballista_scheduler::cluster::BallistaCluster;
 use ballista_scheduler::config::{Config, ResultExt};
@@ -27,7 +27,7 @@ use std::sync::Arc;
 use std::{env, io};
 use tracing_subscriber::EnvFilter;
 
-fn main() -> Result<()> {
+fn main() -> ballista_core::error::Result<()> {
     let runtime = tokio::runtime::Builder::new_multi_thread()
         .enable_io()
         .enable_time()
@@ -37,7 +37,7 @@ fn main() -> Result<()> {
 
     runtime.block_on(inner())
 }
-async fn inner() -> Result<()> {
+async fn inner() -> ballista_core::error::Result<()> {
     // parse options
     let (opt, _remaining_args) =
         
Config::including_optional_config_files(&["/etc/ballista/scheduler.toml"])
@@ -85,7 +85,10 @@ async fn inner() -> Result<()> {
         tracing.init();
     }
     let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
-    let addr = addr.parse()?;
+    let addr = addr.parse().map_err(|e: std::net::AddrParseError| {
+        BallistaError::Configuration(e.to_string())
+    })?;
+
     let config = opt.try_into()?;
     let cluster = BallistaCluster::new_from_config(&config).await?;
     start_server(cluster, addr, Arc::new(config)).await?;
diff --git a/ballista/scheduler/src/cluster/memory.rs 
b/ballista/scheduler/src/cluster/memory.rs
index c9eac564..07f646b8 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -37,7 +37,7 @@ use crate::scheduler_server::{timestamp_millis, 
timestamp_secs, SessionBuilder};
 use crate::state::session_manager::create_datafusion_context;
 use crate::state::task_manager::JobInfoCache;
 use ballista_core::serde::protobuf::job_status::Status;
-use log::{error, info, warn};
+use log::{debug, error, info, warn};
 use std::collections::{HashMap, HashSet};
 use std::ops::DerefMut;
 
@@ -45,7 +45,6 @@ use ballista_core::consistent_hash::node::Node;
 use datafusion::physical_plan::ExecutionPlan;
 use std::sync::Arc;
 use tokio::sync::{Mutex, MutexGuard};
-use tracing::debug;
 
 #[derive(Default)]
 pub struct InMemoryClusterState {
diff --git a/ballista/scheduler/src/cluster/mod.rs 
b/ballista/scheduler/src/cluster/mod.rs
index 94f86969..c54b0cea 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use std::collections::{HashMap, HashSet};
-use std::fmt;
 use std::pin::Pin;
 use std::sync::Arc;
 
@@ -69,9 +68,9 @@ impl std::str::FromStr for ClusterStorage {
         ValueEnum::from_str(s, true)
     }
 }
-
-impl parse_arg::ParseArgFromStr for ClusterStorage {
-    fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
+#[cfg(feature = "build-binary")]
+impl configure_me::parse_arg::ParseArgFromStr for ClusterStorage {
+    fn describe_type<W: std::fmt::Write>(mut writer: W) -> std::fmt::Result {
         write!(writer, "The cluster storage backend for the scheduler")
     }
 }
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 10c6df1d..b221ecb6 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -19,12 +19,12 @@
 //! Ballista scheduler specific configuration
 
 use crate::SessionBuilder;
-use ballista_core::{config::TaskSchedulingPolicy, error::BallistaError, 
ConfigProducer};
-use clap::ValueEnum;
+use ballista_core::{config::TaskSchedulingPolicy, ConfigProducer};
 use datafusion_proto::logical_plan::LogicalExtensionCodec;
 use datafusion_proto::physical_plan::PhysicalExtensionCodec;
-use std::{fmt, sync::Arc};
+use std::sync::Arc;
 
+#[cfg(feature = "build-binary")]
 include!(concat!(
     env!("OUT_DIR"),
     "/scheduler_configure_me_config.rs"
@@ -83,57 +83,6 @@ pub struct SchedulerConfig {
     pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
 }
 
-impl std::fmt::Debug for SchedulerConfig {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("SchedulerConfig")
-            .field("namespace", &self.namespace)
-            .field("external_host", &self.external_host)
-            .field("bind_port", &self.bind_port)
-            .field("bind_host", &self.bind_host)
-            .field("scheduling_policy", &self.scheduling_policy)
-            .field("event_loop_buffer_size", &self.event_loop_buffer_size)
-            .field("task_distribution", &self.task_distribution)
-            .field(
-                "finished_job_data_clean_up_interval_seconds",
-                &self.finished_job_data_clean_up_interval_seconds,
-            )
-            .field(
-                "finished_job_state_clean_up_interval_seconds",
-                &self.finished_job_state_clean_up_interval_seconds,
-            )
-            .field(
-                "advertise_flight_sql_endpoint",
-                &self.advertise_flight_sql_endpoint,
-            )
-            .field("job_resubmit_interval_ms", &self.job_resubmit_interval_ms)
-            .field("cluster_storage", &self.cluster_storage)
-            .field(
-                "executor_termination_grace_period",
-                &self.executor_termination_grace_period,
-            )
-            .field(
-                "scheduler_event_expected_processing_duration",
-                &self.scheduler_event_expected_processing_duration,
-            )
-            .field(
-                "grpc_server_max_decoding_message_size",
-                &self.grpc_server_max_decoding_message_size,
-            )
-            .field(
-                "grpc_server_max_encoding_message_size",
-                &self.grpc_server_max_encoding_message_size,
-            )
-            .field("executor_timeout_seconds", &self.executor_timeout_seconds)
-            .field(
-                "expire_dead_executor_interval_seconds",
-                &self.expire_dead_executor_interval_seconds,
-            )
-            .field("override_logical_codec", &self.override_logical_codec)
-            .field("override_physical_codec", &self.override_physical_codec)
-            .finish()
-    }
-}
-
 impl Default for SchedulerConfig {
     fn default() -> Self {
         Self {
@@ -261,7 +210,8 @@ pub enum ClusterStorageConfig {
 /// Policy of distributing tasks to available executor slots
 ///
 /// It needs to be visible to code generated by configure_me
-#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize)]
+#[derive(Clone, Copy, Debug, serde::Deserialize)]
+#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
 pub enum TaskDistribution {
     /// Eagerly assign tasks to executor slots. This will assign as many task 
slots per executor
     /// as are currently available
@@ -276,16 +226,18 @@ pub enum TaskDistribution {
     ConsistentHash,
 }
 
+#[cfg(feature = "build-binary")]
 impl std::str::FromStr for TaskDistribution {
     type Err = String;
 
     fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
-        ValueEnum::from_str(s, true)
+        clap::ValueEnum::from_str(s, true)
     }
 }
 
-impl parse_arg::ParseArgFromStr for TaskDistribution {
-    fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
+#[cfg(feature = "build-binary")]
+impl configure_me::parse_arg::ParseArgFromStr for TaskDistribution {
+    fn describe_type<W: std::fmt::Write>(mut writer: W) -> std::fmt::Result {
         write!(writer, "The executor slots policy for the scheduler")
     }
 }
@@ -308,9 +260,9 @@ pub enum TaskDistributionPolicy {
         tolerance: usize,
     },
 }
-
+#[cfg(feature = "build-binary")]
 impl TryFrom<Config> for SchedulerConfig {
-    type Error = BallistaError;
+    type Error = ballista_core::error::BallistaError;
 
     fn try_from(opt: Config) -> Result<Self, Self::Error> {
         let task_distribution = match opt.task_distribution {
diff --git a/ballista/scheduler/src/scheduler_process.rs 
b/ballista/scheduler/src/scheduler_process.rs
index 393b03b6..bf6d484f 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use anyhow::{Error, Result};
 #[cfg(feature = "flight-sql")]
 use arrow_flight::flight_service_server::FlightServiceServer;
+use ballista_core::error::BallistaError;
 use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer;
 use ballista_core::serde::{
     BallistaCodec, BallistaLogicalExtensionCodec, 
BallistaPhysicalExtensionCodec,
@@ -43,7 +43,7 @@ pub async fn start_server(
     cluster: BallistaCluster,
     addr: SocketAddr,
     config: Arc<SchedulerConfig>,
-) -> Result<()> {
+) -> ballista_core::error::Result<()> {
     info!(
         "Ballista v{} Scheduler listening on {:?}",
         BALLISTA_VERSION, addr
@@ -109,9 +109,9 @@ pub async fn start_server(
 
     let listener = tokio::net::TcpListener::bind(&addr)
         .await
-        .map_err(Error::from)?;
+        .map_err(BallistaError::from)?;
 
     axum::serve(listener, final_route)
         .await
-        .map_err(Error::from)
+        .map_err(BallistaError::from)
 }
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs 
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index c3f3e7eb..b9b49c7f 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -359,14 +359,9 @@ mod tests {
     use datafusion::test_util::scan_empty_with_partitions;
     use std::sync::Arc;
     use std::time::Duration;
-    use tracing_subscriber::EnvFilter;
 
     #[tokio::test]
     async fn test_pending_job_metric() -> Result<()> {
-        tracing_subscriber::fmt()
-            .with_env_filter(EnvFilter::from_default_env())
-            .init();
-
         let plan = test_plan(10);
 
         let metrics_collector = Arc::new(TestMetricsCollector::default());
diff --git a/ballista/scheduler/src/state/task_manager.rs 
b/ballista/scheduler/src/state/task_manager.rs
index 2e5b76b4..cc8442f2 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -38,7 +38,7 @@ use dashmap::DashMap;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use datafusion_proto::physical_plan::AsExecutionPlan;
-use log::{debug, error, info, warn};
+use log::{debug, error, info, trace, warn};
 use rand::distributions::Alphanumeric;
 use rand::{thread_rng, Rng};
 use std::collections::{HashMap, HashSet};
@@ -48,8 +48,6 @@ use std::time::Duration;
 use std::time::{SystemTime, UNIX_EPOCH};
 use tokio::sync::RwLock;
 
-use tracing::trace;
-
 type ActiveJobCache = Arc<DashMap<String, JobInfoCache>>;
 
 // TODO move to configuration file
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 4701e9c3..941ec849 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -25,7 +25,6 @@ homepage = "https://github.com/apache/arrow-ballista";
 repository = "https://github.com/apache/arrow-ballista";
 license = "Apache-2.0"
 publish = false
-rust-version = "1.72"
 
 [features]
 ci = []
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 65d9cd94..743ff826 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -26,7 +26,6 @@ license = "Apache-2.0"
 keywords = ["arrow", "distributed", "query", "sql"]
 edition = "2021"
 publish = false
-rust-version = "1.72"
 
 [[example]]
 name = "standalone_sql"
@@ -34,11 +33,10 @@ path = "examples/standalone-sql.rs"
 required-features = ["ballista/standalone"]
 
 [dependencies]
-anyhow = { workspace = true }
 ballista = { path = "../ballista/client", version = "0.12.0" }
 ballista-core = { path = "../ballista/core", version = "0.12.0" }
-ballista-executor = { path = "../ballista/executor", version = "0.12.0" }
-ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0" }
+ballista-executor = { path = "../ballista/executor", version = "0.12.0", 
default-features = false }
+ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0", 
default-features = false }
 datafusion = { workspace = true }
 env_logger = { workspace = true }
 log = { workspace = true }
diff --git a/examples/examples/custom-executor.rs 
b/examples/examples/custom-executor.rs
index df3f7c24..53418212 100644
--- a/examples/examples/custom-executor.rs
+++ b/examples/examples/custom-executor.rs
@@ -15,11 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use anyhow::Result;
 use ballista_examples::object_store::{
     custom_runtime_env_with_s3_support, custom_session_config_with_s3_options,
 };
-use ballista_executor::config::prelude::*;
+
 use ballista_executor::executor_process::{
     start_executor_process, ExecutorProcessConfig,
 };
@@ -31,34 +30,23 @@ use std::sync::Arc;
 /// This example demonstrates how to crate custom ballista executors.
 ///
 #[tokio::main]
-async fn main() -> Result<()> {
+async fn main() -> ballista_core::error::Result<()> {
     let _ = env_logger::builder()
         .filter_level(log::LevelFilter::Info)
         .is_test(true)
         .try_init();
 
-    let (opt, _remaining_args) =
-        
Config::including_optional_config_files(&["/etc/ballista/executor.toml"])
-            .unwrap_or_exit();
-
-    if opt.version {
-        ballista_core::print_version();
-        std::process::exit(0);
-    }
-
-    let mut config: ExecutorProcessConfig = opt.try_into().unwrap();
-
-    // overriding default config producer with custom producer
-    // which has required S3 configuration options
-    config.override_config_producer =
-        Some(Arc::new(custom_session_config_with_s3_options));
-
-    // overriding default runtime producer with custom producer
-    // which knows how to create S3 connections
-    config.override_runtime_producer =
-        Some(Arc::new(|session_config: &SessionConfig| {
+    let config: ExecutorProcessConfig = ExecutorProcessConfig {
+        // overriding default config producer with custom producer
+        // which has required S3 configuration options
+        override_config_producer: 
Some(Arc::new(custom_session_config_with_s3_options)),
+        // overriding default runtime producer with custom producer
+        // which knows how to create S3 connections
+        override_runtime_producer: Some(Arc::new(|session_config: 
&SessionConfig| {
             custom_runtime_env_with_s3_support(session_config)
-        }));
+        })),
+        ..Default::default()
+    };
 
     start_executor_process(Arc::new(config)).await
 }
diff --git a/examples/examples/custom-scheduler.rs 
b/examples/examples/custom-scheduler.rs
index 30aeb3e3..9783ae28 100644
--- a/examples/examples/custom-scheduler.rs
+++ b/examples/examples/custom-scheduler.rs
@@ -15,52 +15,46 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use anyhow::Result;
-use ballista_core::print_version;
+use ballista_core::error::BallistaError;
 use ballista_examples::object_store::{
     custom_session_config_with_s3_options, 
custom_session_state_with_s3_support,
 };
 use ballista_scheduler::cluster::BallistaCluster;
-use ballista_scheduler::config::{Config, ResultExt, SchedulerConfig};
+use ballista_scheduler::config::SchedulerConfig;
 use ballista_scheduler::scheduler_process::start_server;
 use datafusion::prelude::SessionConfig;
+use std::net::AddrParseError;
 use std::sync::Arc;
 
 ///
 /// # Custom Ballista Scheduler
 ///
-/// This example demonstrates how to crate custom made ballista schedulers.
+/// This example demonstrates how to crate custom ballista schedulers.
 ///
 #[tokio::main]
-async fn main() -> Result<()> {
+async fn main() -> ballista_core::error::Result<()> {
     let _ = env_logger::builder()
         .filter_level(log::LevelFilter::Info)
         .is_test(true)
         .try_init();
 
-    // parse options
-    let (opt, _remaining_args) =
-        
Config::including_optional_config_files(&["/etc/ballista/scheduler.toml"])
-            .unwrap_or_exit();
+    let config: SchedulerConfig = SchedulerConfig {
+        // overriding default runtime producer with custom producer
+        // which knows how to create S3 connections
+        override_config_producer: 
Some(Arc::new(custom_session_config_with_s3_options)),
+        // overriding default session builder, which has custom session 
configuration
+        // runtime environment and session state.
+        override_session_builder: Some(Arc::new(|session_config: 
SessionConfig| {
+            custom_session_state_with_s3_support(session_config)
+        })),
+        ..Default::default()
+    };
 
-    if opt.version {
-        print_version();
-        std::process::exit(0);
-    }
+    let addr = format!("{}:{}", config.bind_host, config.bind_port);
+    let addr = addr
+        .parse()
+        .map_err(|e: AddrParseError| 
BallistaError::Configuration(e.to_string()))?;
 
-    let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
-    let addr = addr.parse()?;
-    let mut config: SchedulerConfig = opt.try_into()?;
-
-    // overriding default runtime producer with custom producer
-    // which knows how to create S3 connections
-    config.override_config_producer =
-        Some(Arc::new(custom_session_config_with_s3_options));
-    // overriding default session builder, which has custom session 
configuration
-    // runtime environment and session state.
-    config.override_session_builder = Some(Arc::new(|session_config: 
SessionConfig| {
-        custom_session_state_with_s3_support(session_config)
-    }));
     let cluster = BallistaCluster::new_from_config(&config).await?;
     start_server(cluster, addr, Arc::new(config)).await?;
 
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 747f330a..f7083822 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -25,7 +25,6 @@ description = "Apache Arrow Ballista Python Client"
 readme = "README.md"
 license = "Apache-2.0"
 edition = "2021"
-rust-version = "1.72"
 include = ["/src", "/ballista", "/LICENSE.txt", "pyproject.toml", 
"Cargo.toml", "Cargo.lock"]
 publish = false
 
@@ -33,15 +32,15 @@ publish = false
 async-trait = "0.1.77"
 ballista = { path = "../ballista/client", version = "0.12.0" }
 ballista-core = { path = "../ballista/core", version = "0.12.0" }
-ballista-executor = { path = "../ballista/executor", version = "0.12.0" }
-ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0" }
+ballista-executor = { path = "../ballista/executor", version = "0.12.0", 
default-features = false }
+ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0", 
default-features = false }
 datafusion = { version = "42", features = ["pyarrow", "avro"] }
 datafusion-proto = { version = "42" }
 datafusion-python = { version = "42" }
 
 pyo3 = { version = "0.22", features = ["extension-module", "abi3", 
"abi3-py38"] }
 pyo3-log = "0.11.0"
-tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread", 
"sync"] }
+tokio = { version = "1.42", features = ["macros", "rt", "rt-multi-thread", 
"sync"] }
 
 [lib]
 crate-type = ["cdylib"]
diff --git a/python/src/cluster.rs b/python/src/cluster.rs
index aa4260ce..848fc488 100644
--- a/python/src/cluster.rs
+++ b/python/src/cluster.rs
@@ -128,8 +128,9 @@ impl PyScheduler {
 
     pub fn __repr__(&self) -> String {
         format!(
-            "BallistaScheduler(config={:?}, listening= {})",
-            self.config,
+            "BallistaScheduler(listening address={}:{}, listening= {})",
+            self.config.bind_host,
+            self.config.bind_port,
             self.handle.is_some()
         )
     }
@@ -246,18 +247,19 @@ impl PyExecutor {
                 self.config.bind_host,
                 self.config.port,
                 self.config.scheduler_host,
-                self.config.scheduler_port
+                self.config.scheduler_port,
             ),
         }
     }
 
     pub fn __repr__(&self) -> String {
         format!(
-            "BallistaExecutor(address={}:{}, scheduler={}:{}, listening={})",
+            "BallistaExecutor(address={}:{}, scheduler={}:{}, 
concurrent_tasks={} listening={})",
             self.config.bind_host,
             self.config.port,
             self.config.scheduler_host,
             self.config.scheduler_port,
+            self.config.concurrent_tasks,
             self.handle.is_some()
         )
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to