This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push: new cb38c050 fix: remove configure_me (#1282) cb38c050 is described below commit cb38c0502dee5d406dad29ca6721c1f7661f2af1 Author: Marko Milenković <milenkov...@users.noreply.github.com> AuthorDate: Wed Jul 30 15:03:33 2025 +0200 fix: remove configure_me (#1282) * remove configure_me from executor * move scheduler to clap * fix clippy and remove configure_me from other parts * remove build.rs as not used anymore --- Cargo.lock | 128 +---------------------- Cargo.toml | 2 - ballista/core/Cargo.toml | 3 +- ballista/core/src/config.rs | 36 ++++--- ballista/executor/Cargo.toml | 8 +- ballista/executor/build.rs | 26 ----- ballista/executor/executor_config_spec.toml | 139 ------------------------- ballista/executor/src/bin/main.rs | 13 +-- ballista/executor/src/config.rs | 94 ++++++++++++++++- ballista/scheduler/Cargo.toml | 7 +- ballista/scheduler/build.rs | 7 -- ballista/scheduler/src/bin/main.rs | 13 +-- ballista/scheduler/src/cluster/mod.rs | 7 -- ballista/scheduler/src/config.rs | 155 +++++++++++++++++++++++++--- 14 files changed, 263 insertions(+), 375 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee6a0552..6ed5450f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -974,7 +974,6 @@ dependencies = [ "aws-credential-types", "chrono", "clap 4.5.40", - "configure_me", "datafusion", "datafusion-proto", "datafusion-proto-common", @@ -1026,8 +1025,7 @@ dependencies = [ "arrow-flight", "async-trait", "ballista-core", - "configure_me", - "configure_me_codegen", + "clap 4.5.40", "dashmap", "datafusion", "datafusion-proto", @@ -1055,8 +1053,6 @@ dependencies = [ "axum", "ballista-core", "clap 4.5.40", - "configure_me", - "configure_me_codegen", "dashmap", "datafusion", "datafusion-proto", @@ -1312,16 +1308,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "cargo_toml" -version = "0.20.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88da5a13c620b4ca0078845707ea9c3faf11edbc3ffd8497d11d686211cd1ac0" -dependencies = [ - "serde", - "toml 0.8.23", -] - [[package]] name = "cc" version = "1.2.27" @@ -1484,34 +1470,6 @@ dependencies = [ "unicode-width 0.2.1", ] -[[package]] -name = "configure_me" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d03c1fbdead926855bdafee8ddf16cd42efb3c75d8cde8c87f8937b99510b39d" -dependencies = [ - "parse_arg 0.1.6", - "serde", - "serde_derive", - "toml 0.5.11", -] - -[[package]] -name = "configure_me_codegen" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e56840275667a19b0e8ab80219c81fb0bd924e567366d9f12aa385fb45511ea" -dependencies = [ - "cargo_toml", - "fmt2io", - "man", - "serde", - "serde_derive", - "toml 0.5.11", - "unicode-segmentation", - "void", -] - [[package]] name = "const-random" version = "0.1.18" @@ -2605,12 +2563,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "fmt2io" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b6129284da9f7e5296cc22183a63f24300e945e297705dcc0672f7df01d62c8" - [[package]] name = "fnv" version = "1.0.7" @@ -3562,15 +3514,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "man" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccbbb1d623a3cbcaeef9a072f7ccbd6f8ca1e788f3e301d5c49bdd67b1f5a942" -dependencies = [ - "roff", -] - [[package]] name = "matchers" version = "0.1.0" @@ -3955,21 +3898,6 @@ dependencies = [ "regex", ] -[[package]] -name = "parse_arg" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32f05bccc8b6036fec4e0c511954e3997987a82acb6a0b50642ecf7c744fe225" -dependencies = [ - "parse_arg 1.0.1", -] - -[[package]] -name = "parse_arg" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bddc33f680b79eaf1e2e56da792c3c2236f86985bbc3a886e8ddee17ae4d3a4" - [[package]] name = "paste" version = "1.0.15" @@ -4658,12 +4586,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" -[[package]] -name = "roff" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e33e4fb37ba46888052c763e4ec2acfedd8f00f62897b630cadb6298b833675e" - [[package]] name = "rstest" version = "0.25.0" @@ -4960,15 +4882,6 @@ dependencies = [ "syn 2.0.103", ] -[[package]] -name = "serde_spanned" -version = "0.6.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3" -dependencies = [ - "serde", -] - [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -5537,35 +5450,11 @@ dependencies = [ "tokio", ] -[[package]] -name = "toml" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234" -dependencies = [ - "serde", -] - -[[package]] -name = "toml" -version = "0.8.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362" -dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit", -] - [[package]] name = "toml_datetime" version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" -dependencies = [ - "serde", -] [[package]] name = "toml_edit" @@ -5574,19 +5463,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ "indexmap 2.9.0", - "serde", - "serde_spanned", "toml_datetime", - "toml_write", "winnow", ] -[[package]] -name = "toml_write" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" - [[package]] name = "tonic" version = "0.12.3" @@ -5909,12 +5789,6 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" -[[package]] -name = "void" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" - [[package]] name = "vsimd" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index db3990da..aac2c5df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,8 +33,6 @@ rust-version = "1.82.0" arrow = { version = "55", features = ["ipc_compression"] } arrow-flight = { version = "55", features = ["flight-sql-experimental"] } clap = { version = "4.5", features = ["derive", "cargo"] } -configure_me = { version = "0.4.0" } -configure_me_codegen = { version = "0.4.4" } datafusion = "48.0.0" datafusion-cli = "48.0.0" datafusion-proto = "48.0.0" diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml index 413895dc..58075d85 100644 --- a/ballista/core/Cargo.toml +++ b/ballista/core/Cargo.toml @@ -35,7 +35,7 @@ exclude = ["*.proto"] rustc-args = ["--cfg", "docsrs"] [features] -build-binary = ["aws-config", "aws-credential-types", "configure_me", "clap", "object_store"] +build-binary = ["aws-config", "aws-credential-types", "clap", "object_store"] docsrs = [] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion/force_hash_collisions"] @@ -47,7 +47,6 @@ aws-config = { version = "1.6.0", optional = true } aws-credential-types = { version = "1.2.0", optional = true } chrono = { version = "0.4", default-features = false } clap = { workspace = true, optional = true } -configure_me = { workspace = true, optional = true } datafusion = { workspace = true } datafusion-proto = { workspace = true } datafusion-proto-common = { workspace = true } diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs index a7757eeb..b5b0ea23 100644 --- a/ballista/core/src/config.rs +++ b/ballista/core/src/config.rs @@ -18,8 +18,8 @@ //! Ballista configuration -use std::collections::HashMap; use std::result; +use std::{collections::HashMap, fmt::Display}; use crate::error::{BallistaError, Result}; @@ -259,7 +259,6 @@ impl datafusion::config::ConfigExtension for BallistaConfig { } // an enum used to configure the scheduler policy -// needs to be visible to code generated by configure_me /// Ballista supports both push-based and pull-based task scheduling. /// It is recommended that you try both to determine which is the best for your use case. @@ -272,6 +271,14 @@ pub enum TaskSchedulingPolicy { /// push-based scheduling can result in lower latency. PushStaged, } +impl Display for TaskSchedulingPolicy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TaskSchedulingPolicy::PullStaged => f.write_str("pull-staged"), + TaskSchedulingPolicy::PushStaged => f.write_str("push-staged"), + } + } +} #[cfg(feature = "build-binary")] impl std::str::FromStr for TaskSchedulingPolicy { @@ -281,15 +288,8 @@ impl std::str::FromStr for TaskSchedulingPolicy { clap::ValueEnum::from_str(s, true) } } -#[cfg(feature = "build-binary")] -impl configure_me::parse_arg::ParseArgFromStr for TaskSchedulingPolicy { - fn describe_type<W: core::fmt::Write>(mut writer: W) -> core::fmt::Result { - write!(writer, "The scheduler policy for the scheduler") - } -} // an enum used to configure the log rolling policy -// needs to be visible to code generated by configure_me #[derive(Clone, Copy, Debug, serde::Deserialize, Default)] #[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))] pub enum LogRotationPolicy { @@ -300,6 +300,17 @@ pub enum LogRotationPolicy { Never, } +impl Display for LogRotationPolicy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + LogRotationPolicy::Minutely => f.write_str("minutely"), + LogRotationPolicy::Hourly => f.write_str("hourly"), + LogRotationPolicy::Daily => f.write_str("daily"), + LogRotationPolicy::Never => f.write_str("never"), + } + } +} + #[cfg(feature = "build-binary")] impl std::str::FromStr for LogRotationPolicy { type Err = String; @@ -309,13 +320,6 @@ impl std::str::FromStr for LogRotationPolicy { } } -#[cfg(feature = "build-binary")] -impl configure_me::parse_arg::ParseArgFromStr for LogRotationPolicy { - fn describe_type<W: core::fmt::Write>(mut writer: W) -> core::fmt::Result { - write!(writer, "The log rotation policy") - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml index 5790878e..f0e5695d 100644 --- a/ballista/executor/Cargo.toml +++ b/ballista/executor/Cargo.toml @@ -27,16 +27,13 @@ authors = ["Apache DataFusion <d...@datafusion.apache.org>"] edition = { workspace = true } rust-version = { workspace = true } -[package.metadata.configure_me.bin] -executor = "executor_config_spec.toml" - [[bin]] name = "ballista-executor" path = "src/bin/main.rs" required-features = ["build-binary"] [features] -build-binary = ["configure_me", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"] +build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"] default = ["build-binary", "mimalloc"] [dependencies] @@ -44,7 +41,7 @@ arrow = { workspace = true } arrow-flight = { workspace = true } async-trait = { workspace = true } ballista-core = { path = "../core", version = "48.0.0" } -configure_me = { workspace = true, optional = true } +clap = { workspace = true, optional = true } dashmap = { workspace = true } datafusion = { workspace = true } datafusion-proto = { workspace = true } @@ -64,7 +61,6 @@ uuid = { workspace = true } [dev-dependencies] [build-dependencies] -configure_me_codegen = { workspace = true } # use libc on unix like platforms to set worker priority in DedicatedExecutor [target."cfg(unix)".dependencies.libc] diff --git a/ballista/executor/build.rs b/ballista/executor/build.rs deleted file mode 100644 index 21ce2d8f..00000000 --- a/ballista/executor/build.rs +++ /dev/null @@ -1,26 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -fn main() -> Result<(), String> { - #[cfg(feature = "build-binary")] - println!("cargo:rerun-if-changed=executor_config_spec.toml"); - #[cfg(feature = "build-binary")] - configure_me_codegen::build_script_auto() - .map_err(|e| format!("configure_me code generation failed: {e}"))?; - - Ok(()) -} diff --git a/ballista/executor/executor_config_spec.toml b/ballista/executor/executor_config_spec.toml deleted file mode 100644 index cca74ec3..00000000 --- a/ballista/executor/executor_config_spec.toml +++ /dev/null @@ -1,139 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[general] -name = "Ballista Executor" -env_prefix = "BALLISTA_EXECUTOR" -conf_file_param = "config_file" - -[[switch]] -name = "version" -doc = "Print version of this executable" - -[[param]] -name = "scheduler_host" -type = "String" -default = "std::string::String::from(\"localhost\")" -doc = "Scheduler host" - -[[param]] -name = "scheduler_port" -type = "u16" -default = "50050" -doc = "scheduler port" - -[[param]] -name = "bind_host" -type = "String" -default = "std::string::String::from(\"0.0.0.0\")" -doc = "Local IP address to bind to." - -[[param]] -name = "external_host" -type = "String" -doc = "Host name or IP address to register with scheduler so that other executors can connect to this executor. If none is provided, the scheduler will use the connecting IP address to communicate with the executor." - -[[param]] -abbr = "p" -name = "bind_port" -type = "u16" -default = "50051" -doc = "bind port" - -[[param]] -name = "bind_grpc_port" -type = "u16" -default = "50052" -doc = "bind grpc service port" - -[[param]] -name = "scheduler_connect_timeout_seconds" -type = "u16" -default = "0" -doc = "How long to try connecting to scheduler before failing. Set to zero to fail after first attempt." - -[[param]] -name = "work_dir" -type = "String" -doc = "Directory for temporary IPC files" - -[[param]] -abbr = "c" -name = "concurrent_tasks" -type = "usize" -default = "0" # defaults to all available cores if left as zero -doc = "Max concurrent tasks." - -[[param]] -abbr = "s" -name = "task_scheduling_policy" -type = "ballista_core::config::TaskSchedulingPolicy" -doc = "The task scheduing policy for the scheduler, possible values: pull-staged, push-staged. Default: pull-staged" -default = "ballista_core::config::TaskSchedulingPolicy::PullStaged" - -[[param]] -name = "job_data_clean_up_interval_seconds" -type = "u64" -doc = "Controls the interval in seconds, which the worker cleans up old job dirs on the local machine. 0 means the clean up is disabled" -default = "0" - -[[param]] -name = "job_data_ttl_seconds" -type = "u64" -doc = "The number of seconds to retain job directories on each worker 604800 (7 days, 7 * 24 * 3600), In other words, after job done, how long the resulting data is retained" -default = "604800" - -[[param]] -name = "log_dir" -type = "String" -doc = "Log dir: a path to save log. This will create a new storage directory at the specified path if it does not already exist." - -[[param]] -name = "print_thread_info" -type = "bool" -doc = "Enable print thread ids and names in log file." -default = "true" - -[[param]] -name = "log_level_setting" -type = "String" -doc = "special log level for sub mod. link: https://docs.rs/env_logger/latest/env_logger/#enabling-logging. For example we want whole level is INFO but datafusion mode is DEBUG" -default = "std::string::String::from(\"INFO,datafusion=INFO\")" - -[[param]] -name = "log_rotation_policy" -type = "ballista_core::config::LogRotationPolicy" -doc = "Tracing log rotation policy, possible values: minutely, hourly, daily, never. Default: daily" -default = "ballista_core::config::LogRotationPolicy::Daily" - -[[param]] -name = "grpc_server_max_decoding_message_size" -type = "u32" -default = "16777216" -doc = "The maximum size of a decoded message at the grpc server side. Default: 16MB" - -[[param]] -name = "grpc_server_max_encoding_message_size" -type = "u32" -default = "16777216" -doc = "The maximum size of an encoded message at the grpc server side. Default: 16MB" - -[[param]] -name = "executor_heartbeat_interval_seconds" -type = "u64" -doc = "The heartbeat interval in seconds to the scheduler for push-based task scheduling" -default = "60" diff --git a/ballista/executor/src/bin/main.rs b/ballista/executor/src/bin/main.rs index 50a6154f..96d3e1fd 100644 --- a/ballista/executor/src/bin/main.rs +++ b/ballista/executor/src/bin/main.rs @@ -21,11 +21,11 @@ use ballista_core::config::LogRotationPolicy; use ballista_core::object_store::{ runtime_env_with_s3_support, session_config_with_s3_support, }; -use ballista_core::print_version; -use ballista_executor::config::prelude::*; +use ballista_executor::config::Config; use ballista_executor::executor_process::{ start_executor_process, ExecutorProcessConfig, }; +use clap::Parser; use std::env; use std::sync::Arc; use tracing_subscriber::EnvFilter; @@ -37,14 +37,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; #[tokio::main] async fn main() -> ballista_core::error::Result<()> { // parse command-line arguments - let (opt, _remaining_args) = - Config::including_optional_config_files(&["/etc/ballista/executor.toml"]) - .unwrap_or_exit(); - - if opt.version { - print_version(); - std::process::exit(0); - } + let opt = Config::parse(); let mut config: ExecutorProcessConfig = opt.try_into()?; config.override_config_producer = Some(Arc::new(session_config_with_s3_support)); diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs index 53ffc10e..9fa2472b 100644 --- a/ballista/executor/src/config.rs +++ b/ballista/executor/src/config.rs @@ -25,10 +25,96 @@ use ballista_core::error::BallistaError; use crate::executor_process::ExecutorProcessConfig; -// Ideally we would use the include_config macro from configure_me, but then we cannot use -// #[allow(clippy::all)] to silence clippy warnings from the generated code - -include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs")); +#[cfg(feature = "build-binary")] +#[derive(clap::Parser, Debug)] +#[command(version, about, long_about = None)] +pub struct Config { + #[arg(long, default_value_t = String::from("localhost"), help = "Scheduler host")] + pub scheduler_host: String, + #[arg(long, default_value_t = 50050, help = "scheduler port")] + pub scheduler_port: u16, + #[arg(long, default_value_t = String::from("0.0.0.0"), help = "Local IP address to bind to.")] + pub bind_host: String, + #[arg( + long, + help = "Host name or IP address to register with scheduler so that other executors can connect to this executor. If none is provided, the scheduler will use the connecting IP address to communicate with the executor." + )] + pub external_host: Option<String>, + #[arg(short = 'p', long, default_value_t = 50051, help = "bind port")] + pub bind_port: u16, + #[arg(long, default_value_t = 50052, help = "bind grpc service port")] + pub bind_grpc_port: u16, + #[arg( + long, + default_value_t = 0, + help = "How long to try connecting to scheduler before failing. Set to zero to fail after first attempt." + )] + pub scheduler_connect_timeout_seconds: u16, + #[arg(long, help = "Directory for temporary IPC files")] + pub work_dir: Option<String>, + #[arg( + short = 'c', + long, + default_value_t = 0, + help = "Max concurrent tasks. (defaults to all available cores if left as zero)" + )] + pub concurrent_tasks: usize, + #[arg(short = 's', long, default_value_t = ballista_core::config::TaskSchedulingPolicy::PullStaged, help = "The task scheduling policy for the scheduler, possible values: pull-staged, push-staged. Default: pull-staged")] + pub task_scheduling_policy: ballista_core::config::TaskSchedulingPolicy, + #[arg( + long, + default_value_t = 0, + help = "Controls the interval in seconds, which the worker cleans up old job dirs on the local machine. 0 means the clean up is disabled" + )] + pub job_data_clean_up_interval_seconds: u64, + #[arg( + long, + default_value_t = 604800, + help = "The number of seconds to retain job directories on each worker 604800 (7 days, 7 * 24 * 3600), In other words, after job done, how long the resulting data is retained" + )] + pub job_data_ttl_seconds: u64, + #[arg( + long, + help = "Log dir: a path to save log. This will create a new storage directory at the specified path if it does not already exist." + )] + pub log_dir: Option<String>, + #[arg( + long, + default_value_t = true, + help = "Enable print thread ids and names in log file." + )] + pub print_thread_info: bool, + #[arg( + long, + default_value_t = String::from("INFO,datafusion=INFO"), + help = "special log level for sub mod. link: https://docs.rs/env_logger/latest/env_logger/#enabling-logging. For example we want whole level is INFO but datafusion mode is DEBUG" + )] + pub log_level_setting: String, + #[arg( + long, + default_value_t = ballista_core::config::LogRotationPolicy::Daily, + help = "Tracing log rotation policy, possible values: minutely, hourly, daily, never. Default: daily" + )] + pub log_rotation_policy: ballista_core::config::LogRotationPolicy, + #[arg( + long, + default_value_t = 16777216, + help = "The maximum size of a decoded message at the grpc server side. Default: 16MB" + )] + pub grpc_server_max_decoding_message_size: u32, + #[arg( + long, + default_value_t = 16777216, + help = "The maximum size of an encoded message at the grpc server side. Default: 16MB" + )] + pub grpc_server_max_encoding_message_size: u32, + #[arg( + long, + default_value_t = 60, + help = "The heartbeat interval in seconds to the scheduler for push-based task scheduling" + )] + pub executor_heartbeat_interval_seconds: u64, +} impl TryFrom<Config> for ExecutorProcessConfig { type Error = BallistaError; diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml index 41f6469e..ad304155 100644 --- a/ballista/scheduler/Cargo.toml +++ b/ballista/scheduler/Cargo.toml @@ -27,16 +27,13 @@ authors = ["Apache DataFusion <d...@datafusion.apache.org>"] edition = { workspace = true } rust-version = { workspace = true } -[package.metadata.configure_me.bin] -scheduler = "scheduler_config_spec.toml" - [[bin]] name = "ballista-scheduler" path = "src/bin/main.rs" required-features = ["build-binary"] [features] -build-binary = ["configure_me", "clap", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"] +build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"] default = ["build-binary"] # job info can cache stage plans, in some cases where # task plans can be re-computed, cache behavior may need to be disabled. @@ -52,7 +49,6 @@ async-trait = { workspace = true } axum = "0.7.7" ballista-core = { path = "../core", version = "48.0.0" } clap = { workspace = true, optional = true } -configure_me = { workspace = true, optional = true } dashmap = { workspace = true } datafusion = { workspace = true } datafusion-proto = { workspace = true } @@ -79,5 +75,4 @@ uuid = { workspace = true } [dev-dependencies] [build-dependencies] -configure_me_codegen = { workspace = true } tonic-build = { workspace = true } diff --git a/ballista/scheduler/build.rs b/ballista/scheduler/build.rs index 3c72b5ca..ae0369dd 100644 --- a/ballista/scheduler/build.rs +++ b/ballista/scheduler/build.rs @@ -16,13 +16,6 @@ // under the License. fn main() -> Result<(), String> { - #[cfg(feature = "build-binary")] - println!("cargo:rerun-if-changed=scheduler_config_spec.toml"); - - #[cfg(feature = "build-binary")] - configure_me_codegen::build_script_auto() - .map_err(|e| format!("configure_me code generation failed: {e}"))?; - #[cfg(feature = "keda-scaler")] println!("cargo:rerun-if-changed=proto/keda.proto"); diff --git a/ballista/scheduler/src/bin/main.rs b/ballista/scheduler/src/bin/main.rs index 90032e6d..d2465a16 100644 --- a/ballista/scheduler/src/bin/main.rs +++ b/ballista/scheduler/src/bin/main.rs @@ -22,10 +22,10 @@ use ballista_core::error::BallistaError; use ballista_core::object_store::{ session_config_with_s3_support, session_state_with_s3_support, }; -use ballista_core::print_version; use ballista_scheduler::cluster::BallistaCluster; -use ballista_scheduler::config::{Config, ResultExt, SchedulerConfig}; +use ballista_scheduler::config::{Config, SchedulerConfig}; use ballista_scheduler::scheduler_process::start_server; +use clap::Parser; use std::sync::Arc; use std::{env, io}; use tracing_subscriber::EnvFilter; @@ -42,14 +42,7 @@ fn main() -> ballista_core::error::Result<()> { } async fn inner() -> ballista_core::error::Result<()> { // parse options - let (opt, _remaining_args) = - Config::including_optional_config_files(&["/etc/ballista/scheduler.toml"]) - .unwrap_or_exit(); - - if opt.version { - print_version(); - std::process::exit(0); - } + let opt = Config::parse(); let rust_log = env::var(EnvFilter::DEFAULT_ENV); let log_filter = EnvFilter::new(rust_log.unwrap_or(opt.log_level_setting.clone())); diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs index 2edf0792..64da28d2 100644 --- a/ballista/scheduler/src/cluster/mod.rs +++ b/ballista/scheduler/src/cluster/mod.rs @@ -54,7 +54,6 @@ pub mod memory; pub mod test_util; // an enum used to configure the backend -// needs to be visible to code generated by configure_me #[derive(Debug, Clone, serde::Deserialize, PartialEq, Eq)] #[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))] pub enum ClusterStorage { @@ -69,12 +68,6 @@ impl std::str::FromStr for ClusterStorage { clap::ValueEnum::from_str(s, true) } } -#[cfg(feature = "build-binary")] -impl configure_me::parse_arg::ParseArgFromStr for ClusterStorage { - fn describe_type<W: std::fmt::Write>(mut writer: W) -> std::fmt::Result { - write!(writer, "The cluster storage backend for the scheduler") - } -} #[derive(Clone)] pub struct BallistaCluster { diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs index fbb53413..e02a6d61 100644 --- a/ballista/scheduler/src/config.rs +++ b/ballista/scheduler/src/config.rs @@ -30,14 +30,141 @@ use crate::SessionBuilder; use ballista_core::{config::TaskSchedulingPolicy, ConfigProducer}; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; +use std::fmt::Display; use std::sync::Arc; +/// Configuration of the application #[cfg(feature = "build-binary")] - -include!(concat!( - env!("OUT_DIR"), - "/scheduler_configure_me_config.rs" -)); +#[derive(clap::Parser, Debug)] +#[command(version, about, long_about = None)] +pub struct Config { + #[arg( + long, + help = "Route for proxying flight results via scheduler. Should be of the form 'IP:PORT" + )] + pub advertise_flight_sql_endpoint: Option<String>, + #[arg(short = 'n', long, default_value_t = String::from("ballista"), help = "Namespace for the ballista cluster that this executor will join. Default: ballista")] + pub namespace: String, + #[arg(long, default_value_t = String::from("0.0.0.0"), help = "Local host name or IP address to bind to. Default: 0.0.0.0")] + pub bind_host: String, + #[arg(long, default_value_t = String::from("localhost"), help = "Host name or IP address so that executors can connect to this scheduler. Default: localhost")] + pub external_host: String, + #[arg( + short = 'p', + long, + default_value_t = 50050, + help = "bind port. Default: 50050" + )] + pub bind_port: u16, + #[arg( + short = 's', + long, + default_value_t = ballista_core::config::TaskSchedulingPolicy::PullStaged, + help = "The scheduling policy for the scheduler, possible values: pull-staged, push-staged. Default: pull-staged" + )] + pub scheduler_policy: ballista_core::config::TaskSchedulingPolicy, + #[arg( + long, + default_value_t = 1000, + help = "Event loop buffer size. Default: 10000" + )] + pub event_loop_buffer_size: u32, + #[arg( + long, + default_value_t = 300, + help = "Delayed interval for cleaning up finished job data. Default: 300" + )] + pub finished_job_data_clean_up_interval_seconds: u64, + #[arg( + long, + default_value_t = 3600, + help = "Delayed interval for cleaning up finished job state. Default: 3600" + )] + pub finished_job_state_clean_up_interval_seconds: u64, + #[arg( + long, + default_value_t = crate::config::TaskDistribution::Bias, + help = "The policy of distributing tasks to available executor slots, possible values: bias, round-robin, consistent-hash. Default: bias" + )] + pub task_distribution: crate::config::TaskDistribution, + #[arg( + long, + default_value_t = 31, + help = "Replica number of each node for the consistent hashing. Default: 31" + )] + pub consistent_hash_num_replicas: u32, + #[arg( + long, + default_value_t = 0, + help = "Tolerance of the consistent hashing policy for task scheduling. Default: 0" + )] + pub consistent_hash_tolerance: u32, + #[arg( + long, + help = "Log dir: a path to save log. This will create a new storage directory at the specified path if it does not already exist." + )] + pub log_dir: Option<String>, + #[arg( + long, + default_value_t = true, + help = "Enable print thread ids and names in log file." + )] + pub print_thread_info: bool, + #[arg( + long, + default_value_t = String::from("INFO,datafusion=INFO"), + help = "special log level for sub mod. link: https://docs.rs/env_logger/latest/env_logger/#enabling-logging. For example we want whole level is INFO but datafusion mode is DEBUG" + )] + pub log_level_setting: String, + #[arg( + long, + default_value_t = ballista_core::config::LogRotationPolicy::Daily, + help = "Tracing log rotation policy, possible values: minutely, hourly, daily, never. Default: daily" + )] + pub log_rotation_policy: ballista_core::config::LogRotationPolicy, + #[arg( + long, + default_value_t = 0, + help = "If job is not able to be scheduled on submission, wait for this interval and resubmit. Default value of 0 indicates that job should not be resubmitted" + )] + pub job_resubmit_interval_ms: u64, + #[arg( + long, + default_value_t = 30, + help = "Time in seconds an executor should be considered lost after it enters terminating status" + )] + pub executor_termination_grace_period: u64, + #[arg( + long, + default_value_t = 0, + help = "The maximum expected processing time of a scheduler event (microseconds). Zero means disable." + )] + pub scheduler_event_expected_processing_duration: u64, + #[arg( + long, + default_value_t = 16777216, + help = "The maximum size of a decoded message at the grpc server side. Default: 16MB" + )] + pub grpc_server_max_decoding_message_size: u32, + #[arg( + long, + default_value_t = 16777216, + help = "The maximum size of an encoded message at the grpc server side. Default: 16MB" + )] + pub grpc_server_max_encoding_message_size: u32, + #[arg( + long, + default_value_t = 180, + help = "The executor timeout in seconds. It should be longer than executor's heartbeat intervals. Only after missing two or tree consecutive heartbeats from a executor, the executor is mark to be dead" + )] + pub executor_timeout_seconds: u64, + #[arg( + long, + default_value_t = 15, + help = "The interval to check expired or dead executors" + )] + pub expire_dead_executor_interval_seconds: u64, +} /// Configurations for the ballista scheduler of scheduling jobs and tasks #[derive(Clone)] @@ -220,7 +347,6 @@ impl SchedulerConfig { /// Policy of distributing tasks to available executor slots /// -/// It needs to be visible to code generated by configure_me #[derive(Clone, Copy, Debug, serde::Deserialize)] #[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))] pub enum TaskDistribution { @@ -237,6 +363,16 @@ pub enum TaskDistribution { ConsistentHash, } +impl Display for TaskDistribution { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TaskDistribution::Bias => f.write_str("bias"), + TaskDistribution::RoundRobin => f.write_str("round-robin"), + TaskDistribution::ConsistentHash => f.write_str("consistent-hash"), + } + } +} + #[cfg(feature = "build-binary")] impl std::str::FromStr for TaskDistribution { type Err = String; @@ -246,13 +382,6 @@ impl std::str::FromStr for TaskDistribution { } } -#[cfg(feature = "build-binary")] -impl configure_me::parse_arg::ParseArgFromStr for TaskDistribution { - fn describe_type<W: std::fmt::Write>(mut writer: W) -> std::fmt::Result { - write!(writer, "The executor slots policy for the scheduler") - } -} - #[derive(Debug, Clone, Default)] pub enum TaskDistributionPolicy { /// Eagerly assign tasks to executor slots. This will assign as many task slots per executor --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org