This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 19d829f7 Deprecate `BallistaContext` (#1103)
19d829f7 is described below
commit 19d829f7313ef540ab2142f7044472b0d91c5d6b
Author: Marko Milenković <[email protected]>
AuthorDate: Thu Oct 31 19:40:42 2024 +0000
Deprecate `BallistaContext` (#1103)
---
ballista-cli/src/command.rs | 5 +--
ballista-cli/src/exec.rs | 18 ++++-------
ballista-cli/src/main.rs | 46 +++++++++++++++++++--------
ballista/client/src/context.rs | 3 +-
ballista/client/src/prelude.rs | 4 +--
benchmarks/src/bin/tpch.rs | 60 +++++++++++++++++++----------------
examples/examples/remote-dataframe.rs | 26 +++++++++------
examples/examples/remote-sql.rs | 28 ++++++++++------
examples/examples/standalone-sql.rs | 30 +++++++++++++-----
9 files changed, 136 insertions(+), 84 deletions(-)
diff --git a/ballista-cli/src/command.rs b/ballista-cli/src/command.rs
index 2123713a..e5d88729 100644
--- a/ballista-cli/src/command.rs
+++ b/ballista-cli/src/command.rs
@@ -21,11 +21,12 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
-use ballista::prelude::{BallistaContext, BallistaError, Result};
+use ballista::prelude::{BallistaError, Result};
use datafusion::arrow::array::{ArrayRef, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::prelude::SessionContext;
use crate::functions::{display_all_functions, Function};
use crate::print_format::PrintFormat;
@@ -51,7 +52,7 @@ pub enum OutputFormat {
impl Command {
pub async fn execute(
&self,
- ctx: &BallistaContext,
+ ctx: &SessionContext,
print_options: &mut PrintOptions,
) -> Result<()> {
let now = Instant::now();
diff --git a/ballista-cli/src/exec.rs b/ballista-cli/src/exec.rs
index 4548d861..8795b4e6 100644
--- a/ballista-cli/src/exec.rs
+++ b/ballista-cli/src/exec.rs
@@ -23,7 +23,8 @@ use std::io::BufReader;
use std::sync::Arc;
use std::time::Instant;
-use ballista::prelude::{BallistaContext, Result};
+use ballista::prelude::Result;
+use datafusion::prelude::SessionContext;
use rustyline::error::ReadlineError;
use rustyline::Editor;
@@ -35,7 +36,7 @@ use crate::{
/// run and execute SQL statements and commands from a file, against a context
with the given print options
pub async fn exec_from_lines(
- ctx: &BallistaContext,
+ ctx: &SessionContext,
reader: &mut BufReader<File>,
print_options: &PrintOptions,
) {
@@ -80,7 +81,7 @@ pub async fn exec_from_lines(
pub async fn exec_from_files(
files: Vec<String>,
- ctx: &BallistaContext,
+ ctx: &SessionContext,
print_options: &PrintOptions,
) {
let files = files
@@ -94,15 +95,10 @@ pub async fn exec_from_files(
}
/// run and execute SQL statements and commands against a context with the
given print options
-pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut
PrintOptions) {
+pub async fn exec_from_repl(ctx: &SessionContext, print_options: &mut
PrintOptions) {
let mut rl = Editor::new().expect("created editor");
rl.set_helper(Some(CliHelper::new(
- &ctx.context()
- .task_ctx()
- .session_config()
- .options()
- .sql_parser
- .dialect,
+ &ctx.task_ctx().session_config().options().sql_parser.dialect,
print_options.color,
)));
rl.load_history(".history").ok();
@@ -171,7 +167,7 @@ pub async fn exec_from_repl(ctx: &BallistaContext,
print_options: &mut PrintOpti
}
async fn exec_and_print(
- ctx: &BallistaContext,
+ ctx: &SessionContext,
print_options: &PrintOptions,
sql: String,
) -> Result<()> {
diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs
index 0fd6ddfd..ec0c5bbc 100644
--- a/ballista-cli/src/main.rs
+++ b/ballista-cli/src/main.rs
@@ -18,11 +18,21 @@
use std::env;
use std::path::Path;
-use ballista::prelude::{BallistaConfig, BallistaContext, Result};
+use ballista::{
+ extension::BallistaSessionConfigExt,
+ prelude::{
+ Result, SessionContextExt, BALLISTA_DEFAULT_BATCH_SIZE,
+ BALLISTA_STANDALONE_PARALLELISM, BALLISTA_WITH_INFORMATION_SCHEMA,
+ },
+};
use ballista_cli::{
exec, print_format::PrintFormat, print_options::PrintOptions,
BALLISTA_CLI_VERSION,
};
use clap::Parser;
+use datafusion::{
+ execution::SessionStateBuilder,
+ prelude::{SessionConfig, SessionContext},
+};
use datafusion_cli::print_options::MaxRows;
use mimalloc::MiMalloc;
@@ -108,29 +118,39 @@ pub async fn main() -> Result<()> {
env::set_current_dir(p).unwrap();
};
- let mut ballista_config_builder =
- BallistaConfig::builder().set("ballista.with_information_schema",
"true");
+ let mut ballista_config = SessionConfig::new_with_ballista()
+ .set_str(BALLISTA_WITH_INFORMATION_SCHEMA, "true");
if let Some(batch_size) = args.batch_size {
- ballista_config_builder =
- ballista_config_builder.set("ballista.batch.size",
&batch_size.to_string());
+ ballista_config =
+ ballista_config.set_str(BALLISTA_DEFAULT_BATCH_SIZE,
&batch_size.to_string());
};
- let ballista_config = ballista_config_builder.build()?;
-
let ctx = match (args.host, args.port) {
(Some(ref host), Some(port)) => {
+ let address = format!("df://{}:{}", host, port);
+ let state = SessionStateBuilder::new()
+ .with_config(ballista_config)
+ .with_default_features()
+ .build();
+
// Distributed execution with Ballista Remote
- BallistaContext::remote(host, port, &ballista_config).await?
+ SessionContext::remote_with_state(&address, state).await?
}
_ => {
- let concurrent_tasks = if let Some(concurrent_tasks) =
args.concurrent_tasks {
- concurrent_tasks
- } else {
- num_cpus::get()
+ if let Some(concurrent_tasks) = args.concurrent_tasks {
+ ballista_config = ballista_config.set_str(
+ BALLISTA_STANDALONE_PARALLELISM,
+ &concurrent_tasks.to_string(),
+ );
};
+ let state = SessionStateBuilder::new()
+ .with_config(ballista_config)
+ .with_default_features()
+ .build();
+
// In-process execution with Ballista Standalone
- BallistaContext::standalone(&ballista_config,
concurrent_tasks).await?
+ SessionContext::standalone_with_state(state).await?
}
};
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index b09e1d65..453296c2 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -16,6 +16,7 @@
// under the License.
//! Distributed execution context.
+#![allow(deprecated)] // TO BE REMOVED
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::execution::context::DataFilePaths;
@@ -76,7 +77,7 @@ impl BallistaContextState {
}
}
-// #[deprecated]
+#[deprecated]
pub struct BallistaContext {
state: Arc<Mutex<BallistaContextState>>,
context: Arc<SessionContext>,
diff --git a/ballista/client/src/prelude.rs b/ballista/client/src/prelude.rs
index 1b798877..d06a6530 100644
--- a/ballista/client/src/prelude.rs
+++ b/ballista/client/src/prelude.rs
@@ -28,7 +28,7 @@ pub use ballista_core::{
error::{BallistaError, Result},
};
-pub use futures::StreamExt;
-
+#[allow(deprecated)] // TO BE REMOVED
pub use crate::context::BallistaContext;
pub use crate::extension::SessionContextExt;
+pub use futures::StreamExt;
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 01dcade1..c3806277 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -18,9 +18,9 @@
//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
use arrow_schema::SchemaBuilder;
-use ballista::context::BallistaContext;
+use ballista::extension::BallistaSessionConfigExt;
use ballista::prelude::{
- BallistaConfig, BALLISTA_COLLECT_STATISTICS, BALLISTA_DEFAULT_BATCH_SIZE,
+ SessionContextExt, BALLISTA_COLLECT_STATISTICS,
BALLISTA_DEFAULT_BATCH_SIZE,
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_JOB_NAME,
};
use datafusion::arrow::array::*;
@@ -30,6 +30,7 @@ use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
+use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::{expr::Cast, Expr};
use datafusion::parquet::basic::Compression;
@@ -354,24 +355,27 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) ->
Result<()> {
println!("Running benchmarks with the following options: {opt:?}");
let mut benchmark_run = BenchmarkRun::new(opt.query);
- let config = BallistaConfig::builder()
- .set(
+ let config = SessionConfig::new_with_ballista()
+ .set_str(
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
&format!("{}", opt.partitions),
)
- .set(
+ .set_str(
BALLISTA_JOB_NAME,
&format!("Query derived from TPC-H q{}", opt.query),
)
- .set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
- .set(BALLISTA_COLLECT_STATISTICS, "true")
- .build()
- .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
-
- let ctx =
- BallistaContext::remote(opt.host.unwrap().as_str(), opt.port.unwrap(),
&config)
- .await
- .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+ .set_str(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
+ .set_str(BALLISTA_COLLECT_STATISTICS, "true");
+ let state = SessionStateBuilder::new()
+ .with_default_features()
+ .with_config(config)
+ .build();
+ let address = format!(
+ "df://{}:{}",
+ opt.host.clone().unwrap().as_str(),
+ opt.port.unwrap()
+ );
+ let ctx = SessionContext::remote_with_state(&address, state).await?;
// register tables with Ballista context
let path = opt.path.to_str().unwrap();
@@ -454,29 +458,29 @@ fn write_summary_json(benchmark_run: &mut BenchmarkRun,
path: &Path) -> Result<(
async fn loadtest_ballista(opt: BallistaLoadtestOpt) -> Result<()> {
println!("Running loadtest_ballista with the following options: {opt:?}");
- let config = BallistaConfig::builder()
- .set(
+ let config = SessionConfig::new_with_ballista()
+ .set_str(
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
&format!("{}", opt.partitions),
)
- .set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
- .build()
- .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+ .set_str(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size));
+
+ let state = SessionStateBuilder::new()
+ .with_default_features()
+ .with_config(config)
+ .build();
let concurrency = opt.concurrency;
let request_amount = opt.requests;
let mut clients = vec![];
for _num in 0..concurrency {
- clients.push(
- BallistaContext::remote(
- opt.host.clone().unwrap().as_str(),
- opt.port.unwrap(),
- &config,
- )
- .await
- .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?,
+ let address = format!(
+ "df://{}:{}",
+ opt.host.clone().unwrap().as_str(),
+ opt.port.unwrap()
);
+ clients.push(SessionContext::remote_with_state(&address,
state.clone()).await?);
}
// register tables with Ballista context
@@ -566,7 +570,7 @@ fn get_query_sql_by_path(query: usize, mut sql_path:
String) -> Result<String> {
async fn register_tables(
path: &str,
file_format: &str,
- ctx: &BallistaContext,
+ ctx: &SessionContext,
debug: bool,
) -> Result<()> {
for table in TABLES {
diff --git a/examples/examples/remote-dataframe.rs
b/examples/examples/remote-dataframe.rs
index 6b190cea..ae6d6531 100644
--- a/examples/examples/remote-dataframe.rs
+++ b/examples/examples/remote-dataframe.rs
@@ -15,28 +15,36 @@
// specific language governing permissions and limitations
// under the License.
-use ballista::prelude::*;
-use datafusion::prelude::{col, lit, ParquetReadOptions};
+use ballista::{extension::BallistaSessionConfigExt, prelude::*};
+use ballista_examples::test_util;
+use datafusion::{
+ execution::SessionStateBuilder,
+ prelude::{col, lit, ParquetReadOptions, SessionConfig, SessionContext},
+};
/// This example demonstrates executing a simple query against an Arrow data
source (Parquet) and
/// fetching results, using the DataFrame trait
#[tokio::main]
async fn main() -> Result<()> {
- let config = BallistaConfig::builder()
- .set("ballista.shuffle.partitions", "4")
- .build()?;
- let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
+ let config = SessionConfig::new_with_ballista()
+ .set_str(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4");
- let filename = "testdata/alltypes_plain.parquet";
+ let state = SessionStateBuilder::new()
+ .with_config(config)
+ .with_default_features()
+ .build();
+
+ let ctx = SessionContext::remote_with_state("df://localhost:50050",
state).await?;
+
+ let test_data = test_util::examples_test_data();
+ let filename = format!("{test_data}/alltypes_plain.parquet");
- // define the query using the DataFrame trait
let df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;
- // print the results
df.show().await?;
Ok(())
diff --git a/examples/examples/remote-sql.rs b/examples/examples/remote-sql.rs
index f8afad56..26ed11bc 100644
--- a/examples/examples/remote-sql.rs
+++ b/examples/examples/remote-sql.rs
@@ -15,27 +15,36 @@
// specific language governing permissions and limitations
// under the License.
-use ballista::prelude::*;
-use datafusion::prelude::CsvReadOptions;
+use ballista::{extension::BallistaSessionConfigExt, prelude::*};
+use ballista_examples::test_util;
+use datafusion::{
+ execution::SessionStateBuilder,
+ prelude::{CsvReadOptions, SessionConfig, SessionContext},
+};
/// This example demonstrates executing a simple query against an Arrow data
source (CSV) and
/// fetching results, using SQL
#[tokio::main]
async fn main() -> Result<()> {
- let config = BallistaConfig::builder()
- .set("ballista.shuffle.partitions", "4")
- .build()?;
- let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
+ let config = SessionConfig::new_with_ballista()
+ .set_str(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4");
+
+ let state = SessionStateBuilder::new()
+ .with_config(config)
+ .with_default_features()
+ .build();
+
+ let ctx = SessionContext::remote_with_state("df://localhost:50050",
state).await?;
+
+ let test_data = test_util::examples_test_data();
- // register csv file with the execution context
ctx.register_csv(
"test",
- "testdata/aggregate_test_100.csv",
+ &format!("{test_data}/aggregate_test_100.csv"),
CsvReadOptions::new(),
)
.await?;
- // execute the query
let df = ctx
.sql(
"SELECT c1, MIN(c12), MAX(c12) \
@@ -45,7 +54,6 @@ async fn main() -> Result<()> {
)
.await?;
- // print the results
df.show().await?;
Ok(())
diff --git a/examples/examples/standalone-sql.rs
b/examples/examples/standalone-sql.rs
index 0427caa7..d1e637e7 100644
--- a/examples/examples/standalone-sql.rs
+++ b/examples/examples/standalone-sql.rs
@@ -15,24 +15,38 @@
// specific language governing permissions and limitations
// under the License.
-use ballista::prelude::{BallistaConfig, BallistaContext, Result};
+use ballista::{
+ extension::BallistaSessionConfigExt,
+ prelude::{
+ Result, SessionContextExt, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
+ BALLISTA_STANDALONE_PARALLELISM,
+ },
+};
use ballista_examples::test_util;
-use datafusion::execution::options::ParquetReadOptions;
+use datafusion::{
+ execution::{options::ParquetReadOptions, SessionStateBuilder},
+ prelude::{SessionConfig, SessionContext},
+};
#[tokio::main]
async fn main() -> Result<()> {
- let config = BallistaConfig::builder()
- .set("ballista.shuffle.partitions", "1")
- .build()?;
+ let config = SessionConfig::new_with_ballista()
+ .set_str(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "1")
+ .set_str(BALLISTA_STANDALONE_PARALLELISM, "2");
- let ctx = BallistaContext::standalone(&config, 2).await?;
+ let state = SessionStateBuilder::new()
+ .with_config(config)
+ .with_default_features()
+ .build();
- let testdata = test_util::examples_test_data();
+ let ctx = SessionContext::standalone_with_state(state).await?;
+
+ let test_data = test_util::examples_test_data();
// register parquet file with the execution context
ctx.register_parquet(
"test",
- &format!("{testdata}/alltypes_plain.parquet"),
+ &format!("{test_data}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]