This is an automated email from the ASF dual-hosted git repository.
ytyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0283077c63 Test: Validate memory limit for sort queries to extended
test (#14142)
0283077c63 is described below
commit 0283077c63d7b9a7464bc75f0686774cd5d10924
Author: Yongting You <[email protected]>
AuthorDate: Sun Jan 19 13:55:55 2025 +0800
Test: Validate memory limit for sort queries to extended test (#14142)
* External memory limit validation for sort
* add bug tracker
* cleanup
* Update submodule
* reviews
* fix CI
* move feature to module level
---
.github/workflows/extended.yml | 39 +++-
datafusion/core/Cargo.toml | 2 +
.../memory_limit/memory_limit_validation/mod.rs | 22 ++
.../memory_limit_validation/sort_mem_validation.rs | 223 +++++++++++++++++++++
.../memory_limit/memory_limit_validation/utils.rs | 186 +++++++++++++++++
datafusion/core/tests/memory_limit/mod.rs | 2 +
parquet-testing | 2 +-
testing | 2 +-
8 files changed, 475 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/extended.yml b/.github/workflows/extended.yml
index b98e0a1740..1cc39ae76c 100644
--- a/.github/workflows/extended.yml
+++ b/.github/workflows/extended.yml
@@ -33,6 +33,42 @@ on:
- main
jobs:
+ # Check crate compiles and base cargo check passes
+ linux-build-lib:
+ name: linux build test
+ runs-on: ubuntu-latest
+ container:
+ image: amd64/rust
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup Rust toolchain
+ uses: ./.github/actions/setup-builder
+ with:
+ rust-version: stable
+ - name: Prepare cargo build
+ run: cargo check --profile ci --all-targets
+
+ # Run extended tests (with feature 'extended_tests')
+ linux-test-extended:
+ name: cargo test (amd64)
+ needs: linux-build-lib
+ runs-on: ubuntu-latest
+ container:
+ image: amd64/rust
+ steps:
+ - uses: actions/checkout@v4
+ with:
+ submodules: true
+ fetch-depth: 1
+ - name: Setup Rust toolchain
+ uses: ./.github/actions/setup-builder
+ with:
+ rust-version: stable
+ - name: Run tests (excluding doctests)
+ run: cargo test --profile ci --exclude datafusion-examples --exclude
datafusion-benchmarks --workspace --lib --tests --bins --features
avro,json,backtrace,extended_tests
+ - name: Verify Working Directory Clean
+ run: git diff --exit-code
+
# Check answers are correct when hash values collide
hash-collisions:
name: cargo test hash collisions (amd64)
@@ -51,7 +87,8 @@ jobs:
- name: Run tests
run: |
cd datafusion
- cargo test --profile ci --exclude datafusion-examples --exclude
datafusion-benchmarks --exclude datafusion-sqllogictest --workspace --lib
--tests --features=force_hash_collisions,avro
+ cargo test --profile ci --exclude datafusion-examples --exclude
datafusion-benchmarks --exclude datafusion-sqllogictest --workspace --lib
--tests --features=force_hash_collisions,avro,extended_tests
+
sqllogictest-sqlite:
name: "Run sqllogictests with the sqlite test suite"
runs-on: ubuntu-latest
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index e341816b2b..149bf8beb9 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -80,6 +80,7 @@ unicode_expressions = [
"datafusion-sql/unicode_expressions",
"datafusion-functions/unicode_expressions",
]
+extended_tests = []
[dependencies]
apache-avro = { version = "0.17", optional = true }
@@ -150,6 +151,7 @@ rand_distr = "0.4.3"
regex = { workspace = true }
rstest = { workspace = true }
serde_json = { workspace = true }
+sysinfo = "0.33.1"
test-utils = { path = "../../test-utils" }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot",
"fs"] }
diff --git a/datafusion/core/tests/memory_limit/memory_limit_validation/mod.rs
b/datafusion/core/tests/memory_limit/memory_limit_validation/mod.rs
new file mode 100644
index 0000000000..32df6c5d62
--- /dev/null
+++ b/datafusion/core/tests/memory_limit/memory_limit_validation/mod.rs
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Validates query's actual memory usage is consistent with the specified
memory
+//! limit.
+
+mod sort_mem_validation;
+mod utils;
diff --git
a/datafusion/core/tests/memory_limit/memory_limit_validation/sort_mem_validation.rs
b/datafusion/core/tests/memory_limit/memory_limit_validation/sort_mem_validation.rs
new file mode 100644
index 0000000000..1789f37535
--- /dev/null
+++
b/datafusion/core/tests/memory_limit/memory_limit_validation/sort_mem_validation.rs
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Memory limit validation tests for the sort queries
+//!
+//! These tests must run in separate processes to accurately measure memory
usage.
+//! This file is organized as:
+//! - Test runners that spawn individual test processes
+//! - Test cases that contain the actual validation logic
+use std::{process::Command, str};
+
+use log::info;
+
+use crate::memory_limit::memory_limit_validation::utils;
+
+// ===========================================================================
+// Test runners:
+// Runners are splitted into multiple tests to run in parallel
+// ===========================================================================
+
+#[test]
+fn memory_limit_validation_runner_works_runner() {
+ spawn_test_process("memory_limit_validation_runner_works");
+}
+
+#[test]
+fn sort_no_mem_limit_runner() {
+ spawn_test_process("sort_no_mem_limit");
+}
+
+#[test]
+fn sort_with_mem_limit_1_runner() {
+ spawn_test_process("sort_with_mem_limit_1");
+}
+
+#[test]
+fn sort_with_mem_limit_2_runner() {
+ spawn_test_process("sort_with_mem_limit_2");
+}
+
+#[test]
+fn sort_with_mem_limit_3_runner() {
+ spawn_test_process("sort_with_mem_limit_3");
+}
+
+#[test]
+fn sort_with_mem_limit_2_cols_1_runner() {
+ spawn_test_process("sort_with_mem_limit_2_cols_1");
+}
+
+#[test]
+fn sort_with_mem_limit_2_cols_2_runner() {
+ spawn_test_process("sort_with_mem_limit_2_cols_2");
+}
+
+/// Helper function that executes a test in a separate process with the
required environment
+/// variable set. Memory limit validation tasks need to measure memory
resident set
+/// size (RSS), so they must run in a separate process.
+fn spawn_test_process(test: &str) {
+ let test_path = format!(
+ "memory_limit::memory_limit_validation::sort_mem_validation::{}",
+ test
+ );
+ info!("Running test: {}", test_path);
+
+ // Run the test command
+ let output = Command::new("cargo")
+ .arg("test")
+ .arg("--package")
+ .arg("datafusion")
+ .arg("--test")
+ .arg("core_integration")
+ .arg("--features")
+ .arg("extended_tests")
+ .arg("--")
+ .arg(&test_path)
+ .arg("--exact")
+ .arg("--nocapture")
+ .env("DATAFUSION_TEST_MEM_LIMIT_VALIDATION", "1")
+ .output()
+ .expect("Failed to execute test command");
+
+ // Convert output to strings
+ let stdout = str::from_utf8(&output.stdout).unwrap_or("");
+ let stderr = str::from_utf8(&output.stderr).unwrap_or("");
+
+ info!("{}", stdout);
+
+ assert!(
+ output.status.success(),
+ "Test '{}' failed with status: {}\nstdout:\n{}\nstderr:\n{}",
+ test,
+ output.status,
+ stdout,
+ stderr
+ );
+}
+
+// ===========================================================================
+// Test cases:
+// All following tests need to be run through their individual test wrapper.
+// When run directly, environment variable
`DATAFUSION_TEST_MEM_LIMIT_VALIDATION`
+// is not set, test will return with a no-op.
+//
+// If some tests consistently fail, suppress by setting a larger expected
memory
+// usage (e.g. 80_000_000 * 3 -> 80_000_000 * 4)
+// ===========================================================================
+
+/// Test runner itself: if memory limit violated, test should fail.
+#[tokio::test]
+async fn memory_limit_validation_runner_works() {
+ if std::env::var("DATAFUSION_TEST_MEM_LIMIT_VALIDATION").is_err() {
+ println!("Skipping test because DATAFUSION_TEST_MEM_LIMIT_VALIDATION
is not set");
+
+ return;
+ }
+
+ let result = std::panic::catch_unwind(|| {
+ tokio::runtime::Runtime::new().unwrap().block_on(async {
+ utils::validate_query_with_memory_limits(
+ 20_000_000, // set an impossible limit: query requires at
least 80MB
+ None,
+ "select * from generate_series(1,10000000) as t1(c1) order by
c1",
+ "select * from generate_series(1,1000000) as t1(c1) order by
c1", // Baseline query with ~10% of data
+ )
+ .await;
+ })
+ });
+
+ assert!(
+ result.is_err(),
+ "Expected the query to panic due to memory limit"
+ );
+}
+
+#[tokio::test]
+async fn sort_no_mem_limit() {
+ utils::validate_query_with_memory_limits(
+ 80_000_000 * 3,
+ None,
+ "select * from generate_series(1,10000000) as t1(c1) order by c1",
+ "select * from generate_series(1,1000000) as t1(c1) order by c1", //
Baseline query with ~10% of data
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn sort_with_mem_limit_1() {
+ utils::validate_query_with_memory_limits(
+ 40_000_000 * 5,
+ Some(40_000_000),
+ "select * from generate_series(1,10000000) as t1(c1) order by c1",
+ "select * from generate_series(1,1000000) as t1(c1) order by c1", //
Baseline query with ~10% of data
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn sort_with_mem_limit_2() {
+ utils::validate_query_with_memory_limits(
+ 80_000_000 * 3,
+ Some(80_000_000),
+ "select * from generate_series(1,10000000) as t1(c1) order by c1",
+ "select * from generate_series(1,1000000) as t1(c1) order by c1", //
Baseline query with ~10% of data
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn sort_with_mem_limit_3() {
+ utils::validate_query_with_memory_limits(
+ 80_000_000 * 3,
+ Some(80_000_000 * 10), // mem limit is large enough so that no spill
happens
+ "select * from generate_series(1,10000000) as t1(c1) order by c1",
+ "select * from generate_series(1,1000000) as t1(c1) order by c1", //
Baseline query with ~10% of data
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn sort_with_mem_limit_2_cols_1() {
+ let memory_usage_in_theory = 80_000_000 * 2; // 2 columns
+ let expected_max_mem_usage = memory_usage_in_theory * 4;
+ utils::validate_query_with_memory_limits(
+ expected_max_mem_usage,
+ None,
+ "select c1, c1 as c2 from generate_series(1,10000000) as t1(c1) order
by c2 DESC, c1 ASC NULLS LAST",
+ "select c1, c1 as c2 from generate_series(1,1000000) as t1(c1) order
by c2 DESC, c1 ASC NULLS LAST", // Baseline query with ~10% of data
+ )
+ .await;
+}
+
+// TODO: Query fails, fix it
+// Issue: https://github.com/apache/datafusion/issues/14143
+#[ignore]
+#[tokio::test]
+async fn sort_with_mem_limit_2_cols_2() {
+ let memory_usage_in_theory = 80_000_000 * 2; // 2 columns
+ let expected_max_mem_usage = memory_usage_in_theory * 3;
+ let mem_limit = memory_usage_in_theory as f64 * 0.5;
+
+ utils::validate_query_with_memory_limits(
+ expected_max_mem_usage,
+ Some(mem_limit as i64),
+ "select c1, c1 as c2 from generate_series(1,10000000) as t1(c1) order
by c2 DESC, c1 ASC NULLS LAST",
+ "select c1, c1 as c2 from generate_series(1,1000000) as t1(c1) order
by c2 DESC, c1 ASC NULLS LAST", // Baseline query with ~10% of data
+ )
+ .await;
+}
diff --git
a/datafusion/core/tests/memory_limit/memory_limit_validation/utils.rs
b/datafusion/core/tests/memory_limit/memory_limit_validation/utils.rs
new file mode 100644
index 0000000000..bdf30c140a
--- /dev/null
+++ b/datafusion/core/tests/memory_limit/memory_limit_validation/utils.rs
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common_runtime::SpawnedTask;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use sysinfo::System;
+use tokio::time::{interval, Duration};
+
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_execution::{
+ memory_pool::{human_readable_size, FairSpillPool},
+ runtime_env::RuntimeEnvBuilder,
+};
+
+/// Measures the maximum RSS (in bytes) during the execution of an async task.
RSS
+/// will be sampled every 7ms.
+///
+/// # Arguments
+///
+/// * `f` - A closure that returns the async task to be measured.
+///
+/// # Returns
+///
+/// A tuple containing the result of the async task and the maximum RSS
observed.
+async fn measure_max_rss<F, Fut, T>(f: F) -> (T, usize)
+where
+ F: FnOnce() -> Fut,
+ Fut: std::future::Future<Output = T>,
+{
+ // Initialize system information
+ let mut system = System::new_all();
+ system.refresh_all();
+
+ // Get the current process ID
+ let pid = sysinfo::get_current_pid().expect("Failed to get current PID");
+
+ // Shared atomic variable to store max RSS
+ let max_rss = Arc::new(AtomicUsize::new(0));
+
+ // Clone for the monitoring task
+ let max_rss_clone = Arc::clone(&max_rss);
+
+ // Spawn a monitoring task
+ let monitor_handle = SpawnedTask::spawn(async move {
+ let mut sys = System::new_all();
+ let mut interval = interval(Duration::from_millis(7));
+
+ loop {
+ interval.tick().await;
+ sys.refresh_all();
+ if let Some(process) = sys.process(pid) {
+ let rss_bytes = process.memory();
+ max_rss_clone
+ .fetch_update(Ordering::Relaxed, Ordering::Relaxed,
|current| {
+ if rss_bytes as usize > current {
+ Some(rss_bytes as usize)
+ } else {
+ None
+ }
+ })
+ .ok();
+ } else {
+ // Process no longer exists
+ break;
+ }
+ }
+ });
+
+ // Execute the async task
+ let result = f().await;
+
+ // Give some time for the monitor to catch the final memory state
+ tokio::time::sleep(Duration::from_millis(200)).await;
+
+ // Terminate the monitoring task
+ drop(monitor_handle);
+
+ // Retrieve the maximum RSS
+ let peak_rss = max_rss.load(Ordering::Relaxed);
+
+ (result, peak_rss)
+}
+
+/// Query runner that validates the memory usage of the query.
+///
+/// Note this function is supposed to run in a separate process for accurate
memory
+/// estimation. If environment variable `DATAFUSION_TEST_MEM_LIMIT_VALIDATION`
is
+/// not set, this function will return immediately, so test cases calls this
function
+/// should first set the environment variable, then create a new process to
run.
+/// See `sort_mem_validation.rs` for more details.
+///
+/// # Arguments
+///
+/// * `expected_mem_bytes` - The maximum expected memory usage for the query.
+/// * `mem_limit_bytes` - The memory limit of the query in bytes. `None` means
no
+/// memory limit is presented.
+/// * `query` - The SQL query to execute
+/// * `baseline_query` - The SQL query to execute for estimating constant
overhead.
+/// This query should use 10% of the data of the main query.
+///
+/// # Example
+///
+/// utils::validate_query_with_memory_limits(
+/// 40_000_000 * 2,
+/// Some(40_000_000),
+/// "SELECT * FROM generate_series(1, 100000000) AS t(i) ORDER BY i",
+/// "SELECT * FROM generate_series(1, 10000000) AS t(i) ORDER BY i"
+/// );
+///
+/// The above function call means:
+/// Set the memory limit to 40MB, and the profiled memory usage of {query -
baseline_query}
+/// should be less than 40MB * 2.
+pub async fn validate_query_with_memory_limits(
+ expected_mem_bytes: i64,
+ mem_limit_bytes: Option<i64>,
+ query: &str,
+ baseline_query: &str,
+) {
+ if std::env::var("DATAFUSION_TEST_MEM_LIMIT_VALIDATION").is_err() {
+ println!("Skipping test because DATAFUSION_TEST_MEM_LIMIT_VALIDATION
is not set");
+
+ return;
+ }
+
+ println!("Current process ID: {}", std::process::id());
+
+ let runtime_builder = RuntimeEnvBuilder::new();
+
+ let runtime = match mem_limit_bytes {
+ Some(mem_limit_bytes) => runtime_builder
+ .with_memory_pool(Arc::new(FairSpillPool::new(mem_limit_bytes as
usize)))
+ .build_arc()
+ .unwrap(),
+ None => runtime_builder.build_arc().unwrap(),
+ };
+
+ let session_config = SessionConfig::new().with_target_partitions(4); //
Make sure the configuration is the same if test is running on different machines
+
+ let ctx = SessionContext::new_with_config_rt(session_config, runtime);
+
+ let df = ctx.sql(query).await.unwrap();
+ // Run a query with 10% data to estimate the constant overhead
+ let df_small = ctx.sql(baseline_query).await.unwrap();
+
+ let (_, baseline_max_rss) =
+ measure_max_rss(|| async { df_small.collect().await.unwrap() }).await;
+
+ let (_, max_rss) = measure_max_rss(|| async { df.collect().await.unwrap()
}).await;
+
+ println!(
+ "Memory before: {}, Memory after: {}",
+ human_readable_size(baseline_max_rss),
+ human_readable_size(max_rss)
+ );
+
+ let actual_mem_usage = max_rss as f64 - baseline_max_rss as f64;
+
+ println!(
+ "Query: {}, Memory usage: {}, Memory limit: {}",
+ query,
+ human_readable_size(actual_mem_usage as usize),
+ human_readable_size(expected_mem_bytes as usize)
+ );
+
+ assert!(
+ actual_mem_usage < expected_mem_bytes as f64,
+ "Memory usage exceeded the theoretical limit. Actual: {}, Expected
limit: {}",
+ human_readable_size(actual_mem_usage as usize),
+ human_readable_size(expected_mem_bytes as usize)
+ );
+}
diff --git a/datafusion/core/tests/memory_limit/mod.rs
b/datafusion/core/tests/memory_limit/mod.rs
index c7514d1c24..b6f2f8e9ac 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -16,6 +16,8 @@
// under the License.
//! This module contains tests for limiting memory at runtime in DataFusion
+#[cfg(feature = "extended_tests")]
+mod memory_limit_validation;
use arrow::datatypes::{Int32Type, SchemaRef};
use arrow::record_batch::RecordBatch;
diff --git a/parquet-testing b/parquet-testing
index e45cd23f78..f4d7ed772a 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit e45cd23f784aab3d6bf0701f8f4e621469ed3be7
+Subproject commit f4d7ed772a62a95111db50fbcad2460833e8c882
diff --git a/testing b/testing
index 98fceecd02..d2a1371230 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 98fceecd024dccd2f8a00e32fc144975f218acf4
+Subproject commit d2a13712303498963395318a4eb42872e66aead7
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]