This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push: new 19d829f7 Deprecate `BallistaContext` (#1103) 19d829f7 is described below commit 19d829f7313ef540ab2142f7044472b0d91c5d6b Author: Marko Milenković <milenkov...@users.noreply.github.com> AuthorDate: Thu Oct 31 19:40:42 2024 +0000 Deprecate `BallistaContext` (#1103) --- ballista-cli/src/command.rs | 5 +-- ballista-cli/src/exec.rs | 18 ++++------- ballista-cli/src/main.rs | 46 +++++++++++++++++++-------- ballista/client/src/context.rs | 3 +- ballista/client/src/prelude.rs | 4 +-- benchmarks/src/bin/tpch.rs | 60 +++++++++++++++++++---------------- examples/examples/remote-dataframe.rs | 26 +++++++++------ examples/examples/remote-sql.rs | 28 ++++++++++------ examples/examples/standalone-sql.rs | 30 +++++++++++++----- 9 files changed, 136 insertions(+), 84 deletions(-) diff --git a/ballista-cli/src/command.rs b/ballista-cli/src/command.rs index 2123713a..e5d88729 100644 --- a/ballista-cli/src/command.rs +++ b/ballista-cli/src/command.rs @@ -21,11 +21,12 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Instant; -use ballista::prelude::{BallistaContext, BallistaError, Result}; +use ballista::prelude::{BallistaError, Result}; use datafusion::arrow::array::{ArrayRef, StringArray}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::prelude::SessionContext; use crate::functions::{display_all_functions, Function}; use crate::print_format::PrintFormat; @@ -51,7 +52,7 @@ pub enum OutputFormat { impl Command { pub async fn execute( &self, - ctx: &BallistaContext, + ctx: &SessionContext, print_options: &mut PrintOptions, ) -> Result<()> { let now = Instant::now(); diff --git a/ballista-cli/src/exec.rs b/ballista-cli/src/exec.rs index 4548d861..8795b4e6 100644 --- a/ballista-cli/src/exec.rs +++ b/ballista-cli/src/exec.rs @@ -23,7 +23,8 @@ use std::io::BufReader; use std::sync::Arc; use std::time::Instant; -use ballista::prelude::{BallistaContext, Result}; +use ballista::prelude::Result; +use datafusion::prelude::SessionContext; use rustyline::error::ReadlineError; use rustyline::Editor; @@ -35,7 +36,7 @@ use crate::{ /// run and execute SQL statements and commands from a file, against a context with the given print options pub async fn exec_from_lines( - ctx: &BallistaContext, + ctx: &SessionContext, reader: &mut BufReader<File>, print_options: &PrintOptions, ) { @@ -80,7 +81,7 @@ pub async fn exec_from_lines( pub async fn exec_from_files( files: Vec<String>, - ctx: &BallistaContext, + ctx: &SessionContext, print_options: &PrintOptions, ) { let files = files @@ -94,15 +95,10 @@ pub async fn exec_from_files( } /// run and execute SQL statements and commands against a context with the given print options -pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut PrintOptions) { +pub async fn exec_from_repl(ctx: &SessionContext, print_options: &mut PrintOptions) { let mut rl = Editor::new().expect("created editor"); rl.set_helper(Some(CliHelper::new( - &ctx.context() - .task_ctx() - .session_config() - .options() - .sql_parser - .dialect, + &ctx.task_ctx().session_config().options().sql_parser.dialect, print_options.color, ))); rl.load_history(".history").ok(); @@ -171,7 +167,7 @@ pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut PrintOpti } async fn exec_and_print( - ctx: &BallistaContext, + ctx: &SessionContext, print_options: &PrintOptions, sql: String, ) -> Result<()> { diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs index 0fd6ddfd..ec0c5bbc 100644 --- a/ballista-cli/src/main.rs +++ b/ballista-cli/src/main.rs @@ -18,11 +18,21 @@ use std::env; use std::path::Path; -use ballista::prelude::{BallistaConfig, BallistaContext, Result}; +use ballista::{ + extension::BallistaSessionConfigExt, + prelude::{ + Result, SessionContextExt, BALLISTA_DEFAULT_BATCH_SIZE, + BALLISTA_STANDALONE_PARALLELISM, BALLISTA_WITH_INFORMATION_SCHEMA, + }, +}; use ballista_cli::{ exec, print_format::PrintFormat, print_options::PrintOptions, BALLISTA_CLI_VERSION, }; use clap::Parser; +use datafusion::{ + execution::SessionStateBuilder, + prelude::{SessionConfig, SessionContext}, +}; use datafusion_cli::print_options::MaxRows; use mimalloc::MiMalloc; @@ -108,29 +118,39 @@ pub async fn main() -> Result<()> { env::set_current_dir(p).unwrap(); }; - let mut ballista_config_builder = - BallistaConfig::builder().set("ballista.with_information_schema", "true"); + let mut ballista_config = SessionConfig::new_with_ballista() + .set_str(BALLISTA_WITH_INFORMATION_SCHEMA, "true"); if let Some(batch_size) = args.batch_size { - ballista_config_builder = - ballista_config_builder.set("ballista.batch.size", &batch_size.to_string()); + ballista_config = + ballista_config.set_str(BALLISTA_DEFAULT_BATCH_SIZE, &batch_size.to_string()); }; - let ballista_config = ballista_config_builder.build()?; - let ctx = match (args.host, args.port) { (Some(ref host), Some(port)) => { + let address = format!("df://{}:{}", host, port); + let state = SessionStateBuilder::new() + .with_config(ballista_config) + .with_default_features() + .build(); + // Distributed execution with Ballista Remote - BallistaContext::remote(host, port, &ballista_config).await? + SessionContext::remote_with_state(&address, state).await? } _ => { - let concurrent_tasks = if let Some(concurrent_tasks) = args.concurrent_tasks { - concurrent_tasks - } else { - num_cpus::get() + if let Some(concurrent_tasks) = args.concurrent_tasks { + ballista_config = ballista_config.set_str( + BALLISTA_STANDALONE_PARALLELISM, + &concurrent_tasks.to_string(), + ); }; + let state = SessionStateBuilder::new() + .with_config(ballista_config) + .with_default_features() + .build(); + // In-process execution with Ballista Standalone - BallistaContext::standalone(&ballista_config, concurrent_tasks).await? + SessionContext::standalone_with_state(state).await? } }; diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs index b09e1d65..453296c2 100644 --- a/ballista/client/src/context.rs +++ b/ballista/client/src/context.rs @@ -16,6 +16,7 @@ // under the License. //! Distributed execution context. +#![allow(deprecated)] // TO BE REMOVED use datafusion::arrow::datatypes::SchemaRef; use datafusion::execution::context::DataFilePaths; @@ -76,7 +77,7 @@ impl BallistaContextState { } } -// #[deprecated] +#[deprecated] pub struct BallistaContext { state: Arc<Mutex<BallistaContextState>>, context: Arc<SessionContext>, diff --git a/ballista/client/src/prelude.rs b/ballista/client/src/prelude.rs index 1b798877..d06a6530 100644 --- a/ballista/client/src/prelude.rs +++ b/ballista/client/src/prelude.rs @@ -28,7 +28,7 @@ pub use ballista_core::{ error::{BallistaError, Result}, }; -pub use futures::StreamExt; - +#[allow(deprecated)] // TO BE REMOVED pub use crate::context::BallistaContext; pub use crate::extension::SessionContextExt; +pub use futures::StreamExt; diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 01dcade1..c3806277 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -18,9 +18,9 @@ //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark. use arrow_schema::SchemaBuilder; -use ballista::context::BallistaContext; +use ballista::extension::BallistaSessionConfigExt; use ballista::prelude::{ - BallistaConfig, BALLISTA_COLLECT_STATISTICS, BALLISTA_DEFAULT_BATCH_SIZE, + SessionContextExt, BALLISTA_COLLECT_STATISTICS, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_JOB_NAME, }; use datafusion::arrow::array::*; @@ -30,6 +30,7 @@ use datafusion::datasource::listing::ListingTableUrl; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionState; +use datafusion::execution::SessionStateBuilder; use datafusion::logical_expr::LogicalPlan; use datafusion::logical_expr::{expr::Cast, Expr}; use datafusion::parquet::basic::Compression; @@ -354,24 +355,27 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> { println!("Running benchmarks with the following options: {opt:?}"); let mut benchmark_run = BenchmarkRun::new(opt.query); - let config = BallistaConfig::builder() - .set( + let config = SessionConfig::new_with_ballista() + .set_str( BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, &format!("{}", opt.partitions), ) - .set( + .set_str( BALLISTA_JOB_NAME, &format!("Query derived from TPC-H q{}", opt.query), ) - .set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size)) - .set(BALLISTA_COLLECT_STATISTICS, "true") - .build() - .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; - - let ctx = - BallistaContext::remote(opt.host.unwrap().as_str(), opt.port.unwrap(), &config) - .await - .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + .set_str(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size)) + .set_str(BALLISTA_COLLECT_STATISTICS, "true"); + let state = SessionStateBuilder::new() + .with_default_features() + .with_config(config) + .build(); + let address = format!( + "df://{}:{}", + opt.host.clone().unwrap().as_str(), + opt.port.unwrap() + ); + let ctx = SessionContext::remote_with_state(&address, state).await?; // register tables with Ballista context let path = opt.path.to_str().unwrap(); @@ -454,29 +458,29 @@ fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) -> Result<( async fn loadtest_ballista(opt: BallistaLoadtestOpt) -> Result<()> { println!("Running loadtest_ballista with the following options: {opt:?}"); - let config = BallistaConfig::builder() - .set( + let config = SessionConfig::new_with_ballista() + .set_str( BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, &format!("{}", opt.partitions), ) - .set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size)) - .build() - .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + .set_str(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size)); + + let state = SessionStateBuilder::new() + .with_default_features() + .with_config(config) + .build(); let concurrency = opt.concurrency; let request_amount = opt.requests; let mut clients = vec![]; for _num in 0..concurrency { - clients.push( - BallistaContext::remote( - opt.host.clone().unwrap().as_str(), - opt.port.unwrap(), - &config, - ) - .await - .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?, + let address = format!( + "df://{}:{}", + opt.host.clone().unwrap().as_str(), + opt.port.unwrap() ); + clients.push(SessionContext::remote_with_state(&address, state.clone()).await?); } // register tables with Ballista context @@ -566,7 +570,7 @@ fn get_query_sql_by_path(query: usize, mut sql_path: String) -> Result<String> { async fn register_tables( path: &str, file_format: &str, - ctx: &BallistaContext, + ctx: &SessionContext, debug: bool, ) -> Result<()> { for table in TABLES { diff --git a/examples/examples/remote-dataframe.rs b/examples/examples/remote-dataframe.rs index 6b190cea..ae6d6531 100644 --- a/examples/examples/remote-dataframe.rs +++ b/examples/examples/remote-dataframe.rs @@ -15,28 +15,36 @@ // specific language governing permissions and limitations // under the License. -use ballista::prelude::*; -use datafusion::prelude::{col, lit, ParquetReadOptions}; +use ballista::{extension::BallistaSessionConfigExt, prelude::*}; +use ballista_examples::test_util; +use datafusion::{ + execution::SessionStateBuilder, + prelude::{col, lit, ParquetReadOptions, SessionConfig, SessionContext}, +}; /// This example demonstrates executing a simple query against an Arrow data source (Parquet) and /// fetching results, using the DataFrame trait #[tokio::main] async fn main() -> Result<()> { - let config = BallistaConfig::builder() - .set("ballista.shuffle.partitions", "4") - .build()?; - let ctx = BallistaContext::remote("localhost", 50050, &config).await?; + let config = SessionConfig::new_with_ballista() + .set_str(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4"); - let filename = "testdata/alltypes_plain.parquet"; + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + + let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; + + let test_data = test_util::examples_test_data(); + let filename = format!("{test_data}/alltypes_plain.parquet"); - // define the query using the DataFrame trait let df = ctx .read_parquet(filename, ParquetReadOptions::default()) .await? .select_columns(&["id", "bool_col", "timestamp_col"])? .filter(col("id").gt(lit(1)))?; - // print the results df.show().await?; Ok(()) diff --git a/examples/examples/remote-sql.rs b/examples/examples/remote-sql.rs index f8afad56..26ed11bc 100644 --- a/examples/examples/remote-sql.rs +++ b/examples/examples/remote-sql.rs @@ -15,27 +15,36 @@ // specific language governing permissions and limitations // under the License. -use ballista::prelude::*; -use datafusion::prelude::CsvReadOptions; +use ballista::{extension::BallistaSessionConfigExt, prelude::*}; +use ballista_examples::test_util; +use datafusion::{ + execution::SessionStateBuilder, + prelude::{CsvReadOptions, SessionConfig, SessionContext}, +}; /// This example demonstrates executing a simple query against an Arrow data source (CSV) and /// fetching results, using SQL #[tokio::main] async fn main() -> Result<()> { - let config = BallistaConfig::builder() - .set("ballista.shuffle.partitions", "4") - .build()?; - let ctx = BallistaContext::remote("localhost", 50050, &config).await?; + let config = SessionConfig::new_with_ballista() + .set_str(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4"); + + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + + let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; + + let test_data = test_util::examples_test_data(); - // register csv file with the execution context ctx.register_csv( "test", - "testdata/aggregate_test_100.csv", + &format!("{test_data}/aggregate_test_100.csv"), CsvReadOptions::new(), ) .await?; - // execute the query let df = ctx .sql( "SELECT c1, MIN(c12), MAX(c12) \ @@ -45,7 +54,6 @@ async fn main() -> Result<()> { ) .await?; - // print the results df.show().await?; Ok(()) diff --git a/examples/examples/standalone-sql.rs b/examples/examples/standalone-sql.rs index 0427caa7..d1e637e7 100644 --- a/examples/examples/standalone-sql.rs +++ b/examples/examples/standalone-sql.rs @@ -15,24 +15,38 @@ // specific language governing permissions and limitations // under the License. -use ballista::prelude::{BallistaConfig, BallistaContext, Result}; +use ballista::{ + extension::BallistaSessionConfigExt, + prelude::{ + Result, SessionContextExt, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, + BALLISTA_STANDALONE_PARALLELISM, + }, +}; use ballista_examples::test_util; -use datafusion::execution::options::ParquetReadOptions; +use datafusion::{ + execution::{options::ParquetReadOptions, SessionStateBuilder}, + prelude::{SessionConfig, SessionContext}, +}; #[tokio::main] async fn main() -> Result<()> { - let config = BallistaConfig::builder() - .set("ballista.shuffle.partitions", "1") - .build()?; + let config = SessionConfig::new_with_ballista() + .set_str(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "1") + .set_str(BALLISTA_STANDALONE_PARALLELISM, "2"); - let ctx = BallistaContext::standalone(&config, 2).await?; + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); - let testdata = test_util::examples_test_data(); + let ctx = SessionContext::standalone_with_state(state).await?; + + let test_data = test_util::examples_test_data(); // register parquet file with the execution context ctx.register_parquet( "test", - &format!("{testdata}/alltypes_plain.parquet"), + &format!("{test_data}/alltypes_plain.parquet"), ParquetReadOptions::default(), ) .await?; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org