This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d056fb5f25 #15507 -- extract tokio runtime creation from hot loop
(#15508)
d056fb5f25 is described below
commit d056fb5f255923a53ceec395c0ec57e005183c61
Author: Bruce Ritchie <[email protected]>
AuthorDate: Mon Mar 31 20:00:38 2025 -0400
#15507 -- extract tokio runtime creation from hot loop (#15508)
---
datafusion/core/benches/aggregate_query_sql.rs | 19 ++++++++-
datafusion/core/benches/csv_load.rs | 11 +++++-
datafusion/core/benches/dataframe.rs | 8 ++--
datafusion/core/benches/distinct_query_sql.rs | 9 ++++-
datafusion/core/benches/filter_query_sql.rs | 9 +++--
datafusion/core/benches/math_query_sql.rs | 14 +++----
datafusion/core/benches/physical_plan.rs | 10 +++--
datafusion/core/benches/sort_limit_query_sql.rs | 15 +++----
datafusion/core/benches/sql_planner.rs | 52 +++++++++++++++----------
datafusion/core/benches/struct_query_sql.rs | 9 ++---
datafusion/core/benches/window_query_sql.rs | 16 +++++++-
11 files changed, 112 insertions(+), 60 deletions(-)
diff --git a/datafusion/core/benches/aggregate_query_sql.rs
b/datafusion/core/benches/aggregate_query_sql.rs
index ebe94450c1..b29bfc4873 100644
--- a/datafusion/core/benches/aggregate_query_sql.rs
+++ b/datafusion/core/benches/aggregate_query_sql.rs
@@ -29,8 +29,7 @@ use parking_lot::Mutex;
use std::sync::Arc;
use tokio::runtime::Runtime;
-fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
- let rt = Runtime::new().unwrap();
+fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}
@@ -51,11 +50,13 @@ fn criterion_benchmark(c: &mut Criterion) {
let array_len = 32768 * 2; // 2^16
let batch_size = 2048; // 2^11
let ctx = create_context(partitions_len, array_len, batch_size).unwrap();
+ let rt = Runtime::new().unwrap();
c.bench_function("aggregate_query_no_group_by 15 12", |b| {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT MIN(f64), AVG(f64), COUNT(f64) \
FROM t",
)
@@ -66,6 +67,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT MIN(f64), MAX(f64) \
FROM t",
)
@@ -76,6 +78,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT COUNT(DISTINCT u64_wide) \
FROM t",
)
@@ -86,6 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT COUNT(DISTINCT u64_narrow) \
FROM t",
)
@@ -96,6 +100,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \
FROM t GROUP BY utf8",
)
@@ -106,6 +111,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \
FROM t \
WHERE f32 > 10 AND f32 < 20 GROUP BY utf8",
@@ -117,6 +123,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \
FROM t GROUP BY u64_narrow",
)
@@ -127,6 +134,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \
FROM t \
WHERE f32 > 10 AND f32 < 20 GROUP BY u64_narrow",
@@ -138,6 +146,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) \
FROM t GROUP BY u64_wide, utf8",
)
@@ -148,6 +157,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT utf8, approx_percentile_cont(u64_wide, 0.5, 2500) \
FROM t GROUP BY utf8",
)
@@ -158,6 +168,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT utf8, approx_percentile_cont(f32, 0.5, 2500) \
FROM t GROUP BY utf8",
)
@@ -168,6 +179,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT MEDIAN(DISTINCT u64_wide), MEDIAN(DISTINCT u64_narrow)
\
FROM t",
)
@@ -178,6 +190,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT first_value(u64_wide order by f64, u64_narrow, utf8),\
last_value(u64_wide order by f64, u64_narrow,
utf8) \
FROM t GROUP BY u64_narrow",
@@ -189,6 +202,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT first_value(u64_wide ignore nulls order by f64,
u64_narrow, utf8), \
last_value(u64_wide ignore nulls order by f64,
u64_narrow, utf8) \
FROM t GROUP BY u64_narrow",
@@ -200,6 +214,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT first_value(u64_wide order by f64), \
last_value(u64_wide order by f64) \
FROM t GROUP BY u64_narrow",
diff --git a/datafusion/core/benches/csv_load.rs
b/datafusion/core/benches/csv_load.rs
index 2d42121ec9..3f98475746 100644
--- a/datafusion/core/benches/csv_load.rs
+++ b/datafusion/core/benches/csv_load.rs
@@ -32,8 +32,12 @@ use std::time::Duration;
use test_utils::AccessLogGenerator;
use tokio::runtime::Runtime;
-fn load_csv(ctx: Arc<Mutex<SessionContext>>, path: &str, options:
CsvReadOptions) {
- let rt = Runtime::new().unwrap();
+fn load_csv(
+ ctx: Arc<Mutex<SessionContext>>,
+ rt: &Runtime,
+ path: &str,
+ options: CsvReadOptions,
+) {
let df = rt.block_on(ctx.lock().read_csv(path, options)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}
@@ -61,6 +65,7 @@ fn generate_test_file() -> TestCsvFile {
fn criterion_benchmark(c: &mut Criterion) {
let ctx = create_context().unwrap();
+ let rt = Runtime::new().unwrap();
let test_file = generate_test_file();
let mut group = c.benchmark_group("load csv testing");
@@ -70,6 +75,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
load_csv(
ctx.clone(),
+ &rt,
test_file.path().to_str().unwrap(),
CsvReadOptions::default(),
)
@@ -80,6 +86,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
load_csv(
ctx.clone(),
+ &rt,
test_file.path().to_str().unwrap(),
CsvReadOptions::default().null_regex(Some("^NULL$|^$".to_string())),
)
diff --git a/datafusion/core/benches/dataframe.rs
b/datafusion/core/benches/dataframe.rs
index 03078e05e1..832553ebed 100644
--- a/datafusion/core/benches/dataframe.rs
+++ b/datafusion/core/benches/dataframe.rs
@@ -44,9 +44,7 @@ fn create_context(field_count: u32) ->
datafusion_common::Result<Arc<SessionCont
Ok(Arc::new(ctx))
}
-fn run(column_count: u32, ctx: Arc<SessionContext>) {
- let rt = Runtime::new().unwrap();
-
+fn run(column_count: u32, ctx: Arc<SessionContext>, rt: &Runtime) {
criterion::black_box(rt.block_on(async {
let mut data_frame = ctx.table("t").await.unwrap();
@@ -67,11 +65,13 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) {
}
fn criterion_benchmark(c: &mut Criterion) {
+ let rt = Runtime::new().unwrap();
+
for column_count in [10, 100, 200, 500] {
let ctx = create_context(column_count).unwrap();
c.bench_function(&format!("with_column_{column_count}"), |b| {
- b.iter(|| run(column_count, ctx.clone()))
+ b.iter(|| run(column_count, ctx.clone(), &rt))
});
}
}
diff --git a/datafusion/core/benches/distinct_query_sql.rs
b/datafusion/core/benches/distinct_query_sql.rs
index ccc6a0e746..4992ae6607 100644
--- a/datafusion/core/benches/distinct_query_sql.rs
+++ b/datafusion/core/benches/distinct_query_sql.rs
@@ -33,8 +33,7 @@ use parking_lot::Mutex;
use std::{sync::Arc, time::Duration};
use tokio::runtime::Runtime;
-fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
- let rt = Runtime::new().unwrap();
+fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}
@@ -55,6 +54,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
let array_len = 1 << 26; // 64 M
let batch_size = 8192;
let ctx = create_context(partitions_len, array_len, batch_size).unwrap();
+ let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("custom-measurement-time");
group.measurement_time(Duration::from_secs(40));
@@ -63,6 +63,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT
10",
)
})
@@ -72,6 +73,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT
100",
)
})
@@ -81,6 +83,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT
1000",
)
})
@@ -90,6 +93,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT
10000",
)
})
@@ -99,6 +103,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT u64_narrow, u64_wide, utf8, f64 FROM t GROUP BY 1, 2,
3, 4 LIMIT 10",
)
})
diff --git a/datafusion/core/benches/filter_query_sql.rs
b/datafusion/core/benches/filter_query_sql.rs
index 0e09ae09d7..c82a160718 100644
--- a/datafusion/core/benches/filter_query_sql.rs
+++ b/datafusion/core/benches/filter_query_sql.rs
@@ -27,9 +27,7 @@ use futures::executor::block_on;
use std::sync::Arc;
use tokio::runtime::Runtime;
-async fn query(ctx: &SessionContext, sql: &str) {
- let rt = Runtime::new().unwrap();
-
+async fn query(ctx: &SessionContext, rt: &Runtime, sql: &str) {
// execute the query
let df = rt.block_on(ctx.sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
@@ -68,10 +66,11 @@ fn create_context(array_len: usize, batch_size: usize) ->
Result<SessionContext>
fn criterion_benchmark(c: &mut Criterion) {
let array_len = 524_288; // 2^19
let batch_size = 4096; // 2^12
+ let rt = Runtime::new().unwrap();
c.bench_function("filter_array", |b| {
let ctx = create_context(array_len, batch_size).unwrap();
- b.iter(|| block_on(query(&ctx, "select f32, f64 from t where f32 >=
f64")))
+ b.iter(|| block_on(query(&ctx, &rt, "select f32, f64 from t where f32
>= f64")))
});
c.bench_function("filter_scalar", |b| {
@@ -79,6 +78,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
block_on(query(
&ctx,
+ &rt,
"select f32, f64 from t where f32 >= 250 and f64 > 250",
))
})
@@ -89,6 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
block_on(query(
&ctx,
+ &rt,
"select f32, f64 from t where f32 in (10, 20, 30, 40)",
))
})
diff --git a/datafusion/core/benches/math_query_sql.rs
b/datafusion/core/benches/math_query_sql.rs
index 92c59d5066..76824850c1 100644
--- a/datafusion/core/benches/math_query_sql.rs
+++ b/datafusion/core/benches/math_query_sql.rs
@@ -36,9 +36,7 @@ use datafusion::datasource::MemTable;
use datafusion::error::Result;
use datafusion::execution::context::SessionContext;
-fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
- let rt = Runtime::new().unwrap();
-
+fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
// execute the query
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
rt.block_on(df.collect()).unwrap();
@@ -81,29 +79,31 @@ fn criterion_benchmark(c: &mut Criterion) {
let array_len = 1048576; // 2^20
let batch_size = 512; // 2^9
let ctx = create_context(array_len, batch_size).unwrap();
+ let rt = Runtime::new().unwrap();
+
c.bench_function("sqrt_20_9", |b| {
- b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
+ b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
});
let array_len = 1048576; // 2^20
let batch_size = 4096; // 2^12
let ctx = create_context(array_len, batch_size).unwrap();
c.bench_function("sqrt_20_12", |b| {
- b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
+ b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
});
let array_len = 4194304; // 2^22
let batch_size = 4096; // 2^12
let ctx = create_context(array_len, batch_size).unwrap();
c.bench_function("sqrt_22_12", |b| {
- b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
+ b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
});
let array_len = 4194304; // 2^22
let batch_size = 16384; // 2^14
let ctx = create_context(array_len, batch_size).unwrap();
c.bench_function("sqrt_22_14", |b| {
- b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
+ b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
});
}
diff --git a/datafusion/core/benches/physical_plan.rs
b/datafusion/core/benches/physical_plan.rs
index aae1457ab9..0a65c52f72 100644
--- a/datafusion/core/benches/physical_plan.rs
+++ b/datafusion/core/benches/physical_plan.rs
@@ -42,6 +42,7 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering;
// as inputs. All record batches must have the same schema.
fn sort_preserving_merge_operator(
session_ctx: Arc<SessionContext>,
+ rt: &Runtime,
batches: Vec<RecordBatch>,
sort: &[&str],
) {
@@ -63,7 +64,6 @@ fn sort_preserving_merge_operator(
.unwrap();
let merge = Arc::new(SortPreservingMergeExec::new(sort, exec));
let task_ctx = session_ctx.task_ctx();
- let rt = Runtime::new().unwrap();
rt.block_on(collect(merge, task_ctx)).unwrap();
}
@@ -166,14 +166,16 @@ fn criterion_benchmark(c: &mut Criterion) {
];
let ctx = Arc::new(SessionContext::new());
+ let rt = Runtime::new().unwrap();
+
for (name, input) in benches {
- let ctx_clone = ctx.clone();
- c.bench_function(name, move |b| {
+ c.bench_function(name, |b| {
b.iter_batched(
|| input.clone(),
|input| {
sort_preserving_merge_operator(
- ctx_clone.clone(),
+ ctx.clone(),
+ &rt,
input,
&["a", "b", "c", "d"],
);
diff --git a/datafusion/core/benches/sort_limit_query_sql.rs
b/datafusion/core/benches/sort_limit_query_sql.rs
index cfd4b8bc4b..e535a01816 100644
--- a/datafusion/core/benches/sort_limit_query_sql.rs
+++ b/datafusion/core/benches/sort_limit_query_sql.rs
@@ -37,9 +37,7 @@ use datafusion::execution::context::SessionContext;
use tokio::runtime::Runtime;
-fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
- let rt = Runtime::new().unwrap();
-
+fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
// execute the query
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
rt.block_on(df.collect()).unwrap();
@@ -104,11 +102,14 @@ fn create_context() -> Arc<Mutex<SessionContext>> {
}
fn criterion_benchmark(c: &mut Criterion) {
+ let ctx = create_context();
+ let rt = Runtime::new().unwrap();
+
c.bench_function("sort_and_limit_by_int", |b| {
- let ctx = create_context();
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT c1, c13, c6, c10 \
FROM aggregate_test_100 \
ORDER BY c6
@@ -118,10 +119,10 @@ fn criterion_benchmark(c: &mut Criterion) {
});
c.bench_function("sort_and_limit_by_float", |b| {
- let ctx = create_context();
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT c1, c13, c12 \
FROM aggregate_test_100 \
ORDER BY c13
@@ -131,10 +132,10 @@ fn criterion_benchmark(c: &mut Criterion) {
});
c.bench_function("sort_and_limit_lex_by_int", |b| {
- let ctx = create_context();
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT c1, c13, c6, c10 \
FROM aggregate_test_100 \
ORDER BY c6 DESC, c10 DESC
@@ -144,10 +145,10 @@ fn criterion_benchmark(c: &mut Criterion) {
});
c.bench_function("sort_and_limit_lex_by_string", |b| {
- let ctx = create_context();
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT c1, c13, c6, c10 \
FROM aggregate_test_100 \
ORDER BY c1, c13
diff --git a/datafusion/core/benches/sql_planner.rs
b/datafusion/core/benches/sql_planner.rs
index 2d79778d4d..49cc830d58 100644
--- a/datafusion/core/benches/sql_planner.rs
+++ b/datafusion/core/benches/sql_planner.rs
@@ -45,14 +45,12 @@ const BENCHMARKS_PATH_2: &str = "./benchmarks/";
const CLICKBENCH_DATA_PATH: &str = "data/hits_partitioned/";
/// Create a logical plan from the specified sql
-fn logical_plan(ctx: &SessionContext, sql: &str) {
- let rt = Runtime::new().unwrap();
+fn logical_plan(ctx: &SessionContext, rt: &Runtime, sql: &str) {
criterion::black_box(rt.block_on(ctx.sql(sql)).unwrap());
}
/// Create a physical ExecutionPlan (by way of logical plan)
-fn physical_plan(ctx: &SessionContext, sql: &str) {
- let rt = Runtime::new().unwrap();
+fn physical_plan(ctx: &SessionContext, rt: &Runtime, sql: &str) {
criterion::black_box(rt.block_on(async {
ctx.sql(sql)
.await
@@ -104,9 +102,8 @@ fn register_defs(ctx: SessionContext, defs: Vec<TableDef>)
-> SessionContext {
ctx
}
-fn register_clickbench_hits_table() -> SessionContext {
+fn register_clickbench_hits_table(rt: &Runtime) -> SessionContext {
let ctx = SessionContext::new();
- let rt = Runtime::new().unwrap();
// use an external table for clickbench benchmarks
let path =
@@ -128,7 +125,11 @@ fn register_clickbench_hits_table() -> SessionContext {
/// Target of this benchmark: control that placeholders replacing does not get
slower,
/// if the query does not contain placeholders at all.
-fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut
Bencher) {
+fn benchmark_with_param_values_many_columns(
+ ctx: &SessionContext,
+ rt: &Runtime,
+ b: &mut Bencher,
+) {
const COLUMNS_NUM: usize = 200;
let mut aggregates = String::new();
for i in 0..COLUMNS_NUM {
@@ -140,7 +141,6 @@ fn benchmark_with_param_values_many_columns(ctx:
&SessionContext, b: &mut Benche
// SELECT max(attr0), ..., max(attrN) FROM t1.
let query = format!("SELECT {} FROM t1", aggregates);
let statement = ctx.state().sql_to_statement(&query, "Generic").unwrap();
- let rt = Runtime::new().unwrap();
let plan =
rt.block_on(async {
ctx.state().statement_to_plan(statement).await.unwrap() });
b.iter(|| {
@@ -230,33 +230,35 @@ fn criterion_benchmark(c: &mut Criterion) {
}
let ctx = create_context();
+ let rt = Runtime::new().unwrap();
// Test simplest
// https://github.com/apache/datafusion/issues/5157
c.bench_function("logical_select_one_from_700", |b| {
- b.iter(|| logical_plan(&ctx, "SELECT c1 FROM t700"))
+ b.iter(|| logical_plan(&ctx, &rt, "SELECT c1 FROM t700"))
});
// Test simplest
// https://github.com/apache/datafusion/issues/5157
c.bench_function("physical_select_one_from_700", |b| {
- b.iter(|| physical_plan(&ctx, "SELECT c1 FROM t700"))
+ b.iter(|| physical_plan(&ctx, &rt, "SELECT c1 FROM t700"))
});
// Test simplest
c.bench_function("logical_select_all_from_1000", |b| {
- b.iter(|| logical_plan(&ctx, "SELECT * FROM t1000"))
+ b.iter(|| logical_plan(&ctx, &rt, "SELECT * FROM t1000"))
});
// Test simplest
c.bench_function("physical_select_all_from_1000", |b| {
- b.iter(|| physical_plan(&ctx, "SELECT * FROM t1000"))
+ b.iter(|| physical_plan(&ctx, &rt, "SELECT * FROM t1000"))
});
c.bench_function("logical_trivial_join_low_numbered_columns", |b| {
b.iter(|| {
logical_plan(
&ctx,
+ &rt,
"SELECT t1.a2, t2.b2 \
FROM t1, t2 WHERE a1 = b1",
)
@@ -267,6 +269,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
logical_plan(
&ctx,
+ &rt,
"SELECT t1.a99, t2.b99 \
FROM t1, t2 WHERE a199 = b199",
)
@@ -277,6 +280,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
logical_plan(
&ctx,
+ &rt,
"SELECT t1.a99, MIN(t2.b1), MAX(t2.b199), AVG(t2.b123),
COUNT(t2.b73) \
FROM t1 JOIN t2 ON t1.a199 = t2.b199 GROUP BY t1.a99",
)
@@ -293,7 +297,7 @@ fn criterion_benchmark(c: &mut Criterion) {
}
let query = format!("SELECT {} FROM t1", aggregates);
b.iter(|| {
- physical_plan(&ctx, &query);
+ physical_plan(&ctx, &rt, &query);
});
});
@@ -302,6 +306,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
physical_plan(
&ctx,
+ &rt,
"SELECT t1.a7, t2.b8 \
FROM t1, t2 WHERE a7 = b7 \
ORDER BY a7",
@@ -313,6 +318,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
physical_plan(
&ctx,
+ &rt,
"SELECT t1.a7, t2.b8 \
FROM t1, t2 WHERE a7 < b7 \
ORDER BY a7",
@@ -324,6 +330,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
physical_plan(
&ctx,
+ &rt,
"SELECT ta.a9, tb.a10, tc.a11, td.a12, te.a13, tf.a14 \
FROM t1 AS ta, t1 AS tb, t1 AS tc, t1 AS td, t1 AS te, t1 AS
tf \
WHERE ta.a9 = tb.a10 AND tb.a10 = tc.a11 AND tc.a11 = td.a12
AND \
@@ -336,6 +343,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
physical_plan(
&ctx,
+ &rt,
"SELECT t1.a7 \
FROM t1 WHERE a7 = (SELECT b8 FROM t2)",
);
@@ -346,6 +354,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
physical_plan(
&ctx,
+ &rt,
"SELECT t1.a7 FROM t1 \
INTERSECT SELECT t2.b8 FROM t2",
);
@@ -356,6 +365,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
logical_plan(
&ctx,
+ &rt,
"SELECT DISTINCT t1.a7 \
FROM t1, t2 WHERE t1.a7 = t2.b8",
);
@@ -370,7 +380,7 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("physical_sorted_union_orderby", |b| {
// SELECT ... UNION ALL ...
let query = union_orderby_query(20);
- b.iter(|| physical_plan(&ctx, &query))
+ b.iter(|| physical_plan(&ctx, &rt, &query))
});
// --- TPC-H ---
@@ -393,7 +403,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let sql =
std::fs::read_to_string(format!("{benchmarks_path}queries/{q}.sql")).unwrap();
c.bench_function(&format!("physical_plan_tpch_{}", q), |b| {
- b.iter(|| physical_plan(&tpch_ctx, &sql))
+ b.iter(|| physical_plan(&tpch_ctx, &rt, &sql))
});
}
@@ -407,7 +417,7 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("physical_plan_tpch_all", |b| {
b.iter(|| {
for sql in &all_tpch_sql_queries {
- physical_plan(&tpch_ctx, sql)
+ physical_plan(&tpch_ctx, &rt, sql)
}
})
});
@@ -442,7 +452,7 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("physical_plan_tpcds_all", |b| {
b.iter(|| {
for sql in &all_tpcds_sql_queries {
- physical_plan(&tpcds_ctx, sql)
+ physical_plan(&tpcds_ctx, &rt, sql)
}
})
});
@@ -468,7 +478,7 @@ fn criterion_benchmark(c: &mut Criterion) {
.map(|l| l.expect("Could not parse line"))
.collect_vec();
- let clickbench_ctx = register_clickbench_hits_table();
+ let clickbench_ctx = register_clickbench_hits_table(&rt);
// for (i, sql) in clickbench_queries.iter().enumerate() {
// c.bench_function(&format!("logical_plan_clickbench_q{}", i + 1),
|b| {
@@ -478,7 +488,7 @@ fn criterion_benchmark(c: &mut Criterion) {
for (i, sql) in clickbench_queries.iter().enumerate() {
c.bench_function(&format!("physical_plan_clickbench_q{}", i + 1), |b| {
- b.iter(|| physical_plan(&clickbench_ctx, sql))
+ b.iter(|| physical_plan(&clickbench_ctx, &rt, sql))
});
}
@@ -493,13 +503,13 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("physical_plan_clickbench_all", |b| {
b.iter(|| {
for sql in &clickbench_queries {
- physical_plan(&clickbench_ctx, sql)
+ physical_plan(&clickbench_ctx, &rt, sql)
}
})
});
c.bench_function("with_param_values_many_columns", |b| {
- benchmark_with_param_values_many_columns(&ctx, b);
+ benchmark_with_param_values_many_columns(&ctx, &rt, b);
});
}
diff --git a/datafusion/core/benches/struct_query_sql.rs
b/datafusion/core/benches/struct_query_sql.rs
index 3ef7292c66..f9cc43d1ea 100644
--- a/datafusion/core/benches/struct_query_sql.rs
+++ b/datafusion/core/benches/struct_query_sql.rs
@@ -27,9 +27,7 @@ use futures::executor::block_on;
use std::sync::Arc;
use tokio::runtime::Runtime;
-async fn query(ctx: &SessionContext, sql: &str) {
- let rt = Runtime::new().unwrap();
-
+async fn query(ctx: &SessionContext, rt: &Runtime, sql: &str) {
// execute the query
let df = rt.block_on(ctx.sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
@@ -68,10 +66,11 @@ fn create_context(array_len: usize, batch_size: usize) ->
Result<SessionContext>
fn criterion_benchmark(c: &mut Criterion) {
let array_len = 524_288; // 2^19
let batch_size = 4096; // 2^12
+ let ctx = create_context(array_len, batch_size).unwrap();
+ let rt = Runtime::new().unwrap();
c.bench_function("struct", |b| {
- let ctx = create_context(array_len, batch_size).unwrap();
- b.iter(|| block_on(query(&ctx, "select struct(f32, f64) from t")))
+ b.iter(|| block_on(query(&ctx, &rt, "select struct(f32, f64) from t")))
});
}
diff --git a/datafusion/core/benches/window_query_sql.rs
b/datafusion/core/benches/window_query_sql.rs
index 42a1e51be3..a55d17a7c5 100644
--- a/datafusion/core/benches/window_query_sql.rs
+++ b/datafusion/core/benches/window_query_sql.rs
@@ -29,8 +29,7 @@ use parking_lot::Mutex;
use std::sync::Arc;
use tokio::runtime::Runtime;
-fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
- let rt = Runtime::new().unwrap();
+fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}
@@ -51,11 +50,13 @@ fn criterion_benchmark(c: &mut Criterion) {
let array_len = 1024 * 1024;
let batch_size = 8 * 1024;
let ctx = create_context(partitions_len, array_len, batch_size).unwrap();
+ let rt = Runtime::new().unwrap();
c.bench_function("window empty over, aggregate functions", |b| {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT \
MAX(f64) OVER (), \
MIN(f32) OVER (), \
@@ -69,6 +70,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT \
FIRST_VALUE(f64) OVER (), \
LAST_VALUE(f32) OVER (), \
@@ -82,6 +84,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT \
MAX(f64) OVER (ORDER BY u64_narrow), \
MIN(f32) OVER (ORDER BY u64_narrow DESC), \
@@ -95,6 +98,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT \
FIRST_VALUE(f64) OVER (ORDER BY u64_narrow), \
LAST_VALUE(f32) OVER (ORDER BY u64_narrow DESC), \
@@ -108,6 +112,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT \
MAX(f64) OVER (PARTITION BY u64_wide), \
MIN(f32) OVER (PARTITION BY u64_wide), \
@@ -123,6 +128,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT \
MAX(f64) OVER (PARTITION BY u64_narrow), \
MIN(f32) OVER (PARTITION BY u64_narrow), \
@@ -137,6 +143,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT \
FIRST_VALUE(f64) OVER (PARTITION BY u64_wide), \
LAST_VALUE(f32) OVER (PARTITION BY u64_wide), \
@@ -150,6 +157,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT \
FIRST_VALUE(f64) OVER (PARTITION BY u64_narrow), \
LAST_VALUE(f32) OVER (PARTITION BY u64_narrow), \
@@ -165,6 +173,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT \
MAX(f64) OVER (PARTITION BY u64_wide ORDER by f64), \
MIN(f32) OVER (PARTITION BY u64_wide ORDER by f64), \
@@ -181,6 +190,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT \
MAX(f64) OVER (PARTITION BY u64_narrow ORDER by f64), \
MIN(f32) OVER (PARTITION BY u64_narrow ORDER by f64), \
@@ -197,6 +207,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT \
FIRST_VALUE(f64) OVER (PARTITION BY u64_wide ORDER by
f64), \
LAST_VALUE(f32) OVER (PARTITION BY u64_wide ORDER by
f64), \
@@ -213,6 +224,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
+ &rt,
"SELECT \
FIRST_VALUE(f64) OVER (PARTITION BY u64_narrow ORDER
by f64), \
LAST_VALUE(f32) OVER (PARTITION BY u64_narrow ORDER by
f64), \
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]