This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 19d829f7 Deprecate `BallistaContext` (#1103)
19d829f7 is described below

commit 19d829f7313ef540ab2142f7044472b0d91c5d6b
Author: Marko Milenković <milenkov...@users.noreply.github.com>
AuthorDate: Thu Oct 31 19:40:42 2024 +0000

    Deprecate `BallistaContext` (#1103)
---
 ballista-cli/src/command.rs           |  5 +--
 ballista-cli/src/exec.rs              | 18 ++++-------
 ballista-cli/src/main.rs              | 46 +++++++++++++++++++--------
 ballista/client/src/context.rs        |  3 +-
 ballista/client/src/prelude.rs        |  4 +--
 benchmarks/src/bin/tpch.rs            | 60 +++++++++++++++++++----------------
 examples/examples/remote-dataframe.rs | 26 +++++++++------
 examples/examples/remote-sql.rs       | 28 ++++++++++------
 examples/examples/standalone-sql.rs   | 30 +++++++++++++-----
 9 files changed, 136 insertions(+), 84 deletions(-)

diff --git a/ballista-cli/src/command.rs b/ballista-cli/src/command.rs
index 2123713a..e5d88729 100644
--- a/ballista-cli/src/command.rs
+++ b/ballista-cli/src/command.rs
@@ -21,11 +21,12 @@ use std::str::FromStr;
 use std::sync::Arc;
 use std::time::Instant;
 
-use ballista::prelude::{BallistaContext, BallistaError, Result};
+use ballista::prelude::{BallistaError, Result};
 
 use datafusion::arrow::array::{ArrayRef, StringArray};
 use datafusion::arrow::datatypes::{DataType, Field, Schema};
 use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::prelude::SessionContext;
 
 use crate::functions::{display_all_functions, Function};
 use crate::print_format::PrintFormat;
@@ -51,7 +52,7 @@ pub enum OutputFormat {
 impl Command {
     pub async fn execute(
         &self,
-        ctx: &BallistaContext,
+        ctx: &SessionContext,
         print_options: &mut PrintOptions,
     ) -> Result<()> {
         let now = Instant::now();
diff --git a/ballista-cli/src/exec.rs b/ballista-cli/src/exec.rs
index 4548d861..8795b4e6 100644
--- a/ballista-cli/src/exec.rs
+++ b/ballista-cli/src/exec.rs
@@ -23,7 +23,8 @@ use std::io::BufReader;
 use std::sync::Arc;
 use std::time::Instant;
 
-use ballista::prelude::{BallistaContext, Result};
+use ballista::prelude::Result;
+use datafusion::prelude::SessionContext;
 use rustyline::error::ReadlineError;
 use rustyline::Editor;
 
@@ -35,7 +36,7 @@ use crate::{
 
 /// run and execute SQL statements and commands from a file, against a context 
with the given print options
 pub async fn exec_from_lines(
-    ctx: &BallistaContext,
+    ctx: &SessionContext,
     reader: &mut BufReader<File>,
     print_options: &PrintOptions,
 ) {
@@ -80,7 +81,7 @@ pub async fn exec_from_lines(
 
 pub async fn exec_from_files(
     files: Vec<String>,
-    ctx: &BallistaContext,
+    ctx: &SessionContext,
     print_options: &PrintOptions,
 ) {
     let files = files
@@ -94,15 +95,10 @@ pub async fn exec_from_files(
 }
 
 /// run and execute SQL statements and commands against a context with the 
given print options
-pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut 
PrintOptions) {
+pub async fn exec_from_repl(ctx: &SessionContext, print_options: &mut 
PrintOptions) {
     let mut rl = Editor::new().expect("created editor");
     rl.set_helper(Some(CliHelper::new(
-        &ctx.context()
-            .task_ctx()
-            .session_config()
-            .options()
-            .sql_parser
-            .dialect,
+        &ctx.task_ctx().session_config().options().sql_parser.dialect,
         print_options.color,
     )));
     rl.load_history(".history").ok();
@@ -171,7 +167,7 @@ pub async fn exec_from_repl(ctx: &BallistaContext, 
print_options: &mut PrintOpti
 }
 
 async fn exec_and_print(
-    ctx: &BallistaContext,
+    ctx: &SessionContext,
     print_options: &PrintOptions,
     sql: String,
 ) -> Result<()> {
diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs
index 0fd6ddfd..ec0c5bbc 100644
--- a/ballista-cli/src/main.rs
+++ b/ballista-cli/src/main.rs
@@ -18,11 +18,21 @@
 use std::env;
 use std::path::Path;
 
-use ballista::prelude::{BallistaConfig, BallistaContext, Result};
+use ballista::{
+    extension::BallistaSessionConfigExt,
+    prelude::{
+        Result, SessionContextExt, BALLISTA_DEFAULT_BATCH_SIZE,
+        BALLISTA_STANDALONE_PARALLELISM, BALLISTA_WITH_INFORMATION_SCHEMA,
+    },
+};
 use ballista_cli::{
     exec, print_format::PrintFormat, print_options::PrintOptions, 
BALLISTA_CLI_VERSION,
 };
 use clap::Parser;
+use datafusion::{
+    execution::SessionStateBuilder,
+    prelude::{SessionConfig, SessionContext},
+};
 use datafusion_cli::print_options::MaxRows;
 use mimalloc::MiMalloc;
 
@@ -108,29 +118,39 @@ pub async fn main() -> Result<()> {
         env::set_current_dir(p).unwrap();
     };
 
-    let mut ballista_config_builder =
-        BallistaConfig::builder().set("ballista.with_information_schema", 
"true");
+    let mut ballista_config = SessionConfig::new_with_ballista()
+        .set_str(BALLISTA_WITH_INFORMATION_SCHEMA, "true");
 
     if let Some(batch_size) = args.batch_size {
-        ballista_config_builder =
-            ballista_config_builder.set("ballista.batch.size", 
&batch_size.to_string());
+        ballista_config =
+            ballista_config.set_str(BALLISTA_DEFAULT_BATCH_SIZE, 
&batch_size.to_string());
     };
 
-    let ballista_config = ballista_config_builder.build()?;
-
     let ctx = match (args.host, args.port) {
         (Some(ref host), Some(port)) => {
+            let address = format!("df://{}:{}", host, port);
+            let state = SessionStateBuilder::new()
+                .with_config(ballista_config)
+                .with_default_features()
+                .build();
+
             // Distributed execution with Ballista Remote
-            BallistaContext::remote(host, port, &ballista_config).await?
+            SessionContext::remote_with_state(&address, state).await?
         }
         _ => {
-            let concurrent_tasks = if let Some(concurrent_tasks) = 
args.concurrent_tasks {
-                concurrent_tasks
-            } else {
-                num_cpus::get()
+            if let Some(concurrent_tasks) = args.concurrent_tasks {
+                ballista_config = ballista_config.set_str(
+                    BALLISTA_STANDALONE_PARALLELISM,
+                    &concurrent_tasks.to_string(),
+                );
             };
+            let state = SessionStateBuilder::new()
+                .with_config(ballista_config)
+                .with_default_features()
+                .build();
+
             // In-process execution with Ballista Standalone
-            BallistaContext::standalone(&ballista_config, 
concurrent_tasks).await?
+            SessionContext::standalone_with_state(state).await?
         }
     };
 
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index b09e1d65..453296c2 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 //! Distributed execution context.
+#![allow(deprecated)] // TO BE REMOVED
 
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::execution::context::DataFilePaths;
@@ -76,7 +77,7 @@ impl BallistaContextState {
     }
 }
 
-// #[deprecated]
+#[deprecated]
 pub struct BallistaContext {
     state: Arc<Mutex<BallistaContextState>>,
     context: Arc<SessionContext>,
diff --git a/ballista/client/src/prelude.rs b/ballista/client/src/prelude.rs
index 1b798877..d06a6530 100644
--- a/ballista/client/src/prelude.rs
+++ b/ballista/client/src/prelude.rs
@@ -28,7 +28,7 @@ pub use ballista_core::{
     error::{BallistaError, Result},
 };
 
-pub use futures::StreamExt;
-
+#[allow(deprecated)] // TO BE REMOVED
 pub use crate::context::BallistaContext;
 pub use crate::extension::SessionContextExt;
+pub use futures::StreamExt;
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 01dcade1..c3806277 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -18,9 +18,9 @@
 //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
 
 use arrow_schema::SchemaBuilder;
-use ballista::context::BallistaContext;
+use ballista::extension::BallistaSessionConfigExt;
 use ballista::prelude::{
-    BallistaConfig, BALLISTA_COLLECT_STATISTICS, BALLISTA_DEFAULT_BATCH_SIZE,
+    SessionContextExt, BALLISTA_COLLECT_STATISTICS, 
BALLISTA_DEFAULT_BATCH_SIZE,
     BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_JOB_NAME,
 };
 use datafusion::arrow::array::*;
@@ -30,6 +30,7 @@ use datafusion::datasource::listing::ListingTableUrl;
 use datafusion::datasource::{MemTable, TableProvider};
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::SessionState;
+use datafusion::execution::SessionStateBuilder;
 use datafusion::logical_expr::LogicalPlan;
 use datafusion::logical_expr::{expr::Cast, Expr};
 use datafusion::parquet::basic::Compression;
@@ -354,24 +355,27 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> 
Result<()> {
     println!("Running benchmarks with the following options: {opt:?}");
     let mut benchmark_run = BenchmarkRun::new(opt.query);
 
-    let config = BallistaConfig::builder()
-        .set(
+    let config = SessionConfig::new_with_ballista()
+        .set_str(
             BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
             &format!("{}", opt.partitions),
         )
-        .set(
+        .set_str(
             BALLISTA_JOB_NAME,
             &format!("Query derived from TPC-H q{}", opt.query),
         )
-        .set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
-        .set(BALLISTA_COLLECT_STATISTICS, "true")
-        .build()
-        .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
-
-    let ctx =
-        BallistaContext::remote(opt.host.unwrap().as_str(), opt.port.unwrap(), 
&config)
-            .await
-            .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+        .set_str(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
+        .set_str(BALLISTA_COLLECT_STATISTICS, "true");
+    let state = SessionStateBuilder::new()
+        .with_default_features()
+        .with_config(config)
+        .build();
+    let address = format!(
+        "df://{}:{}",
+        opt.host.clone().unwrap().as_str(),
+        opt.port.unwrap()
+    );
+    let ctx = SessionContext::remote_with_state(&address, state).await?;
 
     // register tables with Ballista context
     let path = opt.path.to_str().unwrap();
@@ -454,29 +458,29 @@ fn write_summary_json(benchmark_run: &mut BenchmarkRun, 
path: &Path) -> Result<(
 async fn loadtest_ballista(opt: BallistaLoadtestOpt) -> Result<()> {
     println!("Running loadtest_ballista with the following options: {opt:?}");
 
-    let config = BallistaConfig::builder()
-        .set(
+    let config = SessionConfig::new_with_ballista()
+        .set_str(
             BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
             &format!("{}", opt.partitions),
         )
-        .set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
-        .build()
-        .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+        .set_str(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size));
+
+    let state = SessionStateBuilder::new()
+        .with_default_features()
+        .with_config(config)
+        .build();
 
     let concurrency = opt.concurrency;
     let request_amount = opt.requests;
     let mut clients = vec![];
 
     for _num in 0..concurrency {
-        clients.push(
-            BallistaContext::remote(
-                opt.host.clone().unwrap().as_str(),
-                opt.port.unwrap(),
-                &config,
-            )
-            .await
-            .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?,
+        let address = format!(
+            "df://{}:{}",
+            opt.host.clone().unwrap().as_str(),
+            opt.port.unwrap()
         );
+        clients.push(SessionContext::remote_with_state(&address, 
state.clone()).await?);
     }
 
     // register tables with Ballista context
@@ -566,7 +570,7 @@ fn get_query_sql_by_path(query: usize, mut sql_path: 
String) -> Result<String> {
 async fn register_tables(
     path: &str,
     file_format: &str,
-    ctx: &BallistaContext,
+    ctx: &SessionContext,
     debug: bool,
 ) -> Result<()> {
     for table in TABLES {
diff --git a/examples/examples/remote-dataframe.rs 
b/examples/examples/remote-dataframe.rs
index 6b190cea..ae6d6531 100644
--- a/examples/examples/remote-dataframe.rs
+++ b/examples/examples/remote-dataframe.rs
@@ -15,28 +15,36 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use ballista::prelude::*;
-use datafusion::prelude::{col, lit, ParquetReadOptions};
+use ballista::{extension::BallistaSessionConfigExt, prelude::*};
+use ballista_examples::test_util;
+use datafusion::{
+    execution::SessionStateBuilder,
+    prelude::{col, lit, ParquetReadOptions, SessionConfig, SessionContext},
+};
 
 /// This example demonstrates executing a simple query against an Arrow data 
source (Parquet) and
 /// fetching results, using the DataFrame trait
 #[tokio::main]
 async fn main() -> Result<()> {
-    let config = BallistaConfig::builder()
-        .set("ballista.shuffle.partitions", "4")
-        .build()?;
-    let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
+    let config = SessionConfig::new_with_ballista()
+        .set_str(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4");
 
-    let filename = "testdata/alltypes_plain.parquet";
+    let state = SessionStateBuilder::new()
+        .with_config(config)
+        .with_default_features()
+        .build();
+
+    let ctx = SessionContext::remote_with_state("df://localhost:50050", 
state).await?;
+
+    let test_data = test_util::examples_test_data();
+    let filename = format!("{test_data}/alltypes_plain.parquet");
 
-    // define the query using the DataFrame trait
     let df = ctx
         .read_parquet(filename, ParquetReadOptions::default())
         .await?
         .select_columns(&["id", "bool_col", "timestamp_col"])?
         .filter(col("id").gt(lit(1)))?;
 
-    // print the results
     df.show().await?;
 
     Ok(())
diff --git a/examples/examples/remote-sql.rs b/examples/examples/remote-sql.rs
index f8afad56..26ed11bc 100644
--- a/examples/examples/remote-sql.rs
+++ b/examples/examples/remote-sql.rs
@@ -15,27 +15,36 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use ballista::prelude::*;
-use datafusion::prelude::CsvReadOptions;
+use ballista::{extension::BallistaSessionConfigExt, prelude::*};
+use ballista_examples::test_util;
+use datafusion::{
+    execution::SessionStateBuilder,
+    prelude::{CsvReadOptions, SessionConfig, SessionContext},
+};
 
 /// This example demonstrates executing a simple query against an Arrow data 
source (CSV) and
 /// fetching results, using SQL
 #[tokio::main]
 async fn main() -> Result<()> {
-    let config = BallistaConfig::builder()
-        .set("ballista.shuffle.partitions", "4")
-        .build()?;
-    let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
+    let config = SessionConfig::new_with_ballista()
+        .set_str(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4");
+
+    let state = SessionStateBuilder::new()
+        .with_config(config)
+        .with_default_features()
+        .build();
+
+    let ctx = SessionContext::remote_with_state("df://localhost:50050", 
state).await?;
+
+    let test_data = test_util::examples_test_data();
 
-    // register csv file with the execution context
     ctx.register_csv(
         "test",
-        "testdata/aggregate_test_100.csv",
+        &format!("{test_data}/aggregate_test_100.csv"),
         CsvReadOptions::new(),
     )
     .await?;
 
-    // execute the query
     let df = ctx
         .sql(
             "SELECT c1, MIN(c12), MAX(c12) \
@@ -45,7 +54,6 @@ async fn main() -> Result<()> {
         )
         .await?;
 
-    // print the results
     df.show().await?;
 
     Ok(())
diff --git a/examples/examples/standalone-sql.rs 
b/examples/examples/standalone-sql.rs
index 0427caa7..d1e637e7 100644
--- a/examples/examples/standalone-sql.rs
+++ b/examples/examples/standalone-sql.rs
@@ -15,24 +15,38 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use ballista::prelude::{BallistaConfig, BallistaContext, Result};
+use ballista::{
+    extension::BallistaSessionConfigExt,
+    prelude::{
+        Result, SessionContextExt, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
+        BALLISTA_STANDALONE_PARALLELISM,
+    },
+};
 use ballista_examples::test_util;
-use datafusion::execution::options::ParquetReadOptions;
+use datafusion::{
+    execution::{options::ParquetReadOptions, SessionStateBuilder},
+    prelude::{SessionConfig, SessionContext},
+};
 
 #[tokio::main]
 async fn main() -> Result<()> {
-    let config = BallistaConfig::builder()
-        .set("ballista.shuffle.partitions", "1")
-        .build()?;
+    let config = SessionConfig::new_with_ballista()
+        .set_str(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "1")
+        .set_str(BALLISTA_STANDALONE_PARALLELISM, "2");
 
-    let ctx = BallistaContext::standalone(&config, 2).await?;
+    let state = SessionStateBuilder::new()
+        .with_config(config)
+        .with_default_features()
+        .build();
 
-    let testdata = test_util::examples_test_data();
+    let ctx = SessionContext::standalone_with_state(state).await?;
+
+    let test_data = test_util::examples_test_data();
 
     // register parquet file with the execution context
     ctx.register_parquet(
         "test",
-        &format!("{testdata}/alltypes_plain.parquet"),
+        &format!("{test_data}/alltypes_plain.parquet"),
         ParquetReadOptions::default(),
     )
     .await?;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to