This is an automated email from the ASF dual-hosted git repository.

ytyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0283077c63 Test: Validate memory limit for sort queries to extended 
test (#14142)
0283077c63 is described below

commit 0283077c63d7b9a7464bc75f0686774cd5d10924
Author: Yongting You <[email protected]>
AuthorDate: Sun Jan 19 13:55:55 2025 +0800

    Test: Validate memory limit for sort queries to extended test (#14142)
    
    * External memory limit validation for sort
    
    * add bug tracker
    
    * cleanup
    
    * Update submodule
    
    * reviews
    
    * fix CI
    
    * move feature to module level
---
 .github/workflows/extended.yml                     |  39 +++-
 datafusion/core/Cargo.toml                         |   2 +
 .../memory_limit/memory_limit_validation/mod.rs    |  22 ++
 .../memory_limit_validation/sort_mem_validation.rs | 223 +++++++++++++++++++++
 .../memory_limit/memory_limit_validation/utils.rs  | 186 +++++++++++++++++
 datafusion/core/tests/memory_limit/mod.rs          |   2 +
 parquet-testing                                    |   2 +-
 testing                                            |   2 +-
 8 files changed, 475 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/extended.yml b/.github/workflows/extended.yml
index b98e0a1740..1cc39ae76c 100644
--- a/.github/workflows/extended.yml
+++ b/.github/workflows/extended.yml
@@ -33,6 +33,42 @@ on:
       - main
 
 jobs:
+  # Check crate compiles and base cargo check passes
+  linux-build-lib:
+    name: linux build test
+    runs-on: ubuntu-latest
+    container:
+      image: amd64/rust
+    steps:
+      - uses: actions/checkout@v4
+      - name: Setup Rust toolchain
+        uses: ./.github/actions/setup-builder
+        with:
+          rust-version: stable
+      - name: Prepare cargo build
+        run: cargo check --profile ci --all-targets
+
+  # Run extended tests (with feature 'extended_tests')
+  linux-test-extended:
+    name: cargo test (amd64)
+    needs: linux-build-lib
+    runs-on: ubuntu-latest
+    container:
+      image: amd64/rust
+    steps:
+      - uses: actions/checkout@v4
+        with:
+          submodules: true
+          fetch-depth: 1
+      - name: Setup Rust toolchain
+        uses: ./.github/actions/setup-builder
+        with:
+          rust-version: stable
+      - name: Run tests (excluding doctests)
+        run: cargo test --profile ci --exclude datafusion-examples --exclude 
datafusion-benchmarks --workspace --lib --tests --bins --features 
avro,json,backtrace,extended_tests
+      - name: Verify Working Directory Clean
+        run: git diff --exit-code
+
   # Check answers are correct when hash values collide
   hash-collisions:
     name: cargo test hash collisions (amd64)
@@ -51,7 +87,8 @@ jobs:
       - name: Run tests
         run: |
           cd datafusion
-          cargo test  --profile ci --exclude datafusion-examples --exclude 
datafusion-benchmarks --exclude datafusion-sqllogictest --workspace --lib 
--tests --features=force_hash_collisions,avro
+          cargo test  --profile ci --exclude datafusion-examples --exclude 
datafusion-benchmarks --exclude datafusion-sqllogictest --workspace --lib 
--tests --features=force_hash_collisions,avro,extended_tests
+
   sqllogictest-sqlite:
     name: "Run sqllogictests with the sqlite test suite"
     runs-on: ubuntu-latest
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index e341816b2b..149bf8beb9 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -80,6 +80,7 @@ unicode_expressions = [
     "datafusion-sql/unicode_expressions",
     "datafusion-functions/unicode_expressions",
 ]
+extended_tests = []
 
 [dependencies]
 apache-avro = { version = "0.17", optional = true }
@@ -150,6 +151,7 @@ rand_distr = "0.4.3"
 regex = { workspace = true }
 rstest = { workspace = true }
 serde_json = { workspace = true }
+sysinfo = "0.33.1"
 test-utils = { path = "../../test-utils" }
 tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot", 
"fs"] }
 
diff --git a/datafusion/core/tests/memory_limit/memory_limit_validation/mod.rs 
b/datafusion/core/tests/memory_limit/memory_limit_validation/mod.rs
new file mode 100644
index 0000000000..32df6c5d62
--- /dev/null
+++ b/datafusion/core/tests/memory_limit/memory_limit_validation/mod.rs
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Validates query's actual memory usage is consistent with the specified 
memory
+//! limit.
+
+mod sort_mem_validation;
+mod utils;
diff --git 
a/datafusion/core/tests/memory_limit/memory_limit_validation/sort_mem_validation.rs
 
b/datafusion/core/tests/memory_limit/memory_limit_validation/sort_mem_validation.rs
new file mode 100644
index 0000000000..1789f37535
--- /dev/null
+++ 
b/datafusion/core/tests/memory_limit/memory_limit_validation/sort_mem_validation.rs
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Memory limit validation tests for the sort queries
+//!
+//! These tests must run in separate processes to accurately measure memory 
usage.
+//! This file is organized as:
+//! - Test runners that spawn individual test processes
+//! - Test cases that contain the actual validation logic
+use std::{process::Command, str};
+
+use log::info;
+
+use crate::memory_limit::memory_limit_validation::utils;
+
+// ===========================================================================
+// Test runners:
+// Runners are splitted into multiple tests to run in parallel
+// ===========================================================================
+
+#[test]
+fn memory_limit_validation_runner_works_runner() {
+    spawn_test_process("memory_limit_validation_runner_works");
+}
+
+#[test]
+fn sort_no_mem_limit_runner() {
+    spawn_test_process("sort_no_mem_limit");
+}
+
+#[test]
+fn sort_with_mem_limit_1_runner() {
+    spawn_test_process("sort_with_mem_limit_1");
+}
+
+#[test]
+fn sort_with_mem_limit_2_runner() {
+    spawn_test_process("sort_with_mem_limit_2");
+}
+
+#[test]
+fn sort_with_mem_limit_3_runner() {
+    spawn_test_process("sort_with_mem_limit_3");
+}
+
+#[test]
+fn sort_with_mem_limit_2_cols_1_runner() {
+    spawn_test_process("sort_with_mem_limit_2_cols_1");
+}
+
+#[test]
+fn sort_with_mem_limit_2_cols_2_runner() {
+    spawn_test_process("sort_with_mem_limit_2_cols_2");
+}
+
+/// Helper function that executes a test in a separate process with the 
required environment
+/// variable set. Memory limit validation tasks need to measure memory 
resident set
+/// size (RSS), so they must run in a separate process.
+fn spawn_test_process(test: &str) {
+    let test_path = format!(
+        "memory_limit::memory_limit_validation::sort_mem_validation::{}",
+        test
+    );
+    info!("Running test: {}", test_path);
+
+    // Run the test command
+    let output = Command::new("cargo")
+        .arg("test")
+        .arg("--package")
+        .arg("datafusion")
+        .arg("--test")
+        .arg("core_integration")
+        .arg("--features")
+        .arg("extended_tests")
+        .arg("--")
+        .arg(&test_path)
+        .arg("--exact")
+        .arg("--nocapture")
+        .env("DATAFUSION_TEST_MEM_LIMIT_VALIDATION", "1")
+        .output()
+        .expect("Failed to execute test command");
+
+    // Convert output to strings
+    let stdout = str::from_utf8(&output.stdout).unwrap_or("");
+    let stderr = str::from_utf8(&output.stderr).unwrap_or("");
+
+    info!("{}", stdout);
+
+    assert!(
+        output.status.success(),
+        "Test '{}' failed with status: {}\nstdout:\n{}\nstderr:\n{}",
+        test,
+        output.status,
+        stdout,
+        stderr
+    );
+}
+
+// ===========================================================================
+// Test cases:
+// All following tests need to be run through their individual test wrapper.
+// When run directly, environment variable 
`DATAFUSION_TEST_MEM_LIMIT_VALIDATION`
+// is not set, test will return with a no-op.
+//
+// If some tests consistently fail, suppress by setting a larger expected 
memory
+// usage (e.g. 80_000_000 * 3 -> 80_000_000 * 4)
+// ===========================================================================
+
+/// Test runner itself: if memory limit violated, test should fail.
+#[tokio::test]
+async fn memory_limit_validation_runner_works() {
+    if std::env::var("DATAFUSION_TEST_MEM_LIMIT_VALIDATION").is_err() {
+        println!("Skipping test because DATAFUSION_TEST_MEM_LIMIT_VALIDATION 
is not set");
+
+        return;
+    }
+
+    let result = std::panic::catch_unwind(|| {
+        tokio::runtime::Runtime::new().unwrap().block_on(async {
+            utils::validate_query_with_memory_limits(
+                20_000_000, // set an impossible limit: query requires at 
least 80MB
+                None,
+                "select * from generate_series(1,10000000) as t1(c1) order by 
c1",
+                "select * from generate_series(1,1000000) as t1(c1) order by 
c1", // Baseline query with ~10% of data
+            )
+            .await;
+        })
+    });
+
+    assert!(
+        result.is_err(),
+        "Expected the query to panic due to memory limit"
+    );
+}
+
+#[tokio::test]
+async fn sort_no_mem_limit() {
+    utils::validate_query_with_memory_limits(
+        80_000_000 * 3,
+        None,
+        "select * from generate_series(1,10000000) as t1(c1) order by c1",
+        "select * from generate_series(1,1000000) as t1(c1) order by c1", // 
Baseline query with ~10% of data
+    )
+    .await;
+}
+
+#[tokio::test]
+async fn sort_with_mem_limit_1() {
+    utils::validate_query_with_memory_limits(
+        40_000_000 * 5,
+        Some(40_000_000),
+        "select * from generate_series(1,10000000) as t1(c1) order by c1",
+        "select * from generate_series(1,1000000) as t1(c1) order by c1", // 
Baseline query with ~10% of data
+    )
+    .await;
+}
+
+#[tokio::test]
+async fn sort_with_mem_limit_2() {
+    utils::validate_query_with_memory_limits(
+        80_000_000 * 3,
+        Some(80_000_000),
+        "select * from generate_series(1,10000000) as t1(c1) order by c1",
+        "select * from generate_series(1,1000000) as t1(c1) order by c1", // 
Baseline query with ~10% of data
+    )
+    .await;
+}
+
+#[tokio::test]
+async fn sort_with_mem_limit_3() {
+    utils::validate_query_with_memory_limits(
+        80_000_000 * 3,
+        Some(80_000_000 * 10), // mem limit is large enough so that no spill 
happens
+        "select * from generate_series(1,10000000) as t1(c1) order by c1",
+        "select * from generate_series(1,1000000) as t1(c1) order by c1", // 
Baseline query with ~10% of data
+    )
+    .await;
+}
+
+#[tokio::test]
+async fn sort_with_mem_limit_2_cols_1() {
+    let memory_usage_in_theory = 80_000_000 * 2; // 2 columns
+    let expected_max_mem_usage = memory_usage_in_theory * 4;
+    utils::validate_query_with_memory_limits(
+        expected_max_mem_usage,
+        None,
+        "select c1, c1 as c2 from generate_series(1,10000000) as t1(c1) order 
by c2 DESC, c1 ASC NULLS LAST",
+        "select c1, c1 as c2 from generate_series(1,1000000) as t1(c1) order 
by c2 DESC, c1 ASC NULLS LAST", // Baseline query with ~10% of data
+    )
+    .await;
+}
+
+// TODO: Query fails, fix it
+// Issue: https://github.com/apache/datafusion/issues/14143
+#[ignore]
+#[tokio::test]
+async fn sort_with_mem_limit_2_cols_2() {
+    let memory_usage_in_theory = 80_000_000 * 2; // 2 columns
+    let expected_max_mem_usage = memory_usage_in_theory * 3;
+    let mem_limit = memory_usage_in_theory as f64 * 0.5;
+
+    utils::validate_query_with_memory_limits(
+        expected_max_mem_usage,
+        Some(mem_limit as i64),
+        "select c1, c1 as c2 from generate_series(1,10000000) as t1(c1) order 
by c2 DESC, c1 ASC NULLS LAST",
+        "select c1, c1 as c2 from generate_series(1,1000000) as t1(c1) order 
by c2 DESC, c1 ASC NULLS LAST", // Baseline query with ~10% of data
+    )
+    .await;
+}
diff --git 
a/datafusion/core/tests/memory_limit/memory_limit_validation/utils.rs 
b/datafusion/core/tests/memory_limit/memory_limit_validation/utils.rs
new file mode 100644
index 0000000000..bdf30c140a
--- /dev/null
+++ b/datafusion/core/tests/memory_limit/memory_limit_validation/utils.rs
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common_runtime::SpawnedTask;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use sysinfo::System;
+use tokio::time::{interval, Duration};
+
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_execution::{
+    memory_pool::{human_readable_size, FairSpillPool},
+    runtime_env::RuntimeEnvBuilder,
+};
+
+/// Measures the maximum RSS (in bytes) during the execution of an async task. 
RSS
+/// will be sampled every 7ms.
+///
+/// # Arguments
+///
+/// * `f` - A closure that returns the async task to be measured.
+///
+/// # Returns
+///
+/// A tuple containing the result of the async task and the maximum RSS 
observed.
+async fn measure_max_rss<F, Fut, T>(f: F) -> (T, usize)
+where
+    F: FnOnce() -> Fut,
+    Fut: std::future::Future<Output = T>,
+{
+    // Initialize system information
+    let mut system = System::new_all();
+    system.refresh_all();
+
+    // Get the current process ID
+    let pid = sysinfo::get_current_pid().expect("Failed to get current PID");
+
+    // Shared atomic variable to store max RSS
+    let max_rss = Arc::new(AtomicUsize::new(0));
+
+    // Clone for the monitoring task
+    let max_rss_clone = Arc::clone(&max_rss);
+
+    // Spawn a monitoring task
+    let monitor_handle = SpawnedTask::spawn(async move {
+        let mut sys = System::new_all();
+        let mut interval = interval(Duration::from_millis(7));
+
+        loop {
+            interval.tick().await;
+            sys.refresh_all();
+            if let Some(process) = sys.process(pid) {
+                let rss_bytes = process.memory();
+                max_rss_clone
+                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, 
|current| {
+                        if rss_bytes as usize > current {
+                            Some(rss_bytes as usize)
+                        } else {
+                            None
+                        }
+                    })
+                    .ok();
+            } else {
+                // Process no longer exists
+                break;
+            }
+        }
+    });
+
+    // Execute the async task
+    let result = f().await;
+
+    // Give some time for the monitor to catch the final memory state
+    tokio::time::sleep(Duration::from_millis(200)).await;
+
+    // Terminate the monitoring task
+    drop(monitor_handle);
+
+    // Retrieve the maximum RSS
+    let peak_rss = max_rss.load(Ordering::Relaxed);
+
+    (result, peak_rss)
+}
+
+/// Query runner that validates the memory usage of the query.
+///
+/// Note this function is supposed to run in a separate process for accurate 
memory
+/// estimation. If environment variable `DATAFUSION_TEST_MEM_LIMIT_VALIDATION` 
is
+/// not set, this function will return immediately, so test cases calls this 
function
+/// should first set the environment variable, then create a new process to 
run.
+/// See `sort_mem_validation.rs` for more details.
+///
+/// # Arguments
+///
+/// * `expected_mem_bytes` - The maximum expected memory usage for the query.
+/// * `mem_limit_bytes` - The memory limit of the query in bytes. `None` means 
no
+///   memory limit is presented.
+/// * `query` - The SQL query to execute
+/// * `baseline_query` - The SQL query to execute for estimating constant 
overhead.
+///   This query should use 10% of the data of the main query.
+///
+/// # Example
+///
+///     utils::validate_query_with_memory_limits(
+///         40_000_000 * 2,                   
+///         Some(40_000_000),              
+///         "SELECT * FROM generate_series(1, 100000000) AS t(i) ORDER BY i",
+///         "SELECT * FROM generate_series(1, 10000000) AS t(i) ORDER BY i"
+///     );
+///
+/// The above function call means:
+/// Set the memory limit to 40MB, and the profiled memory usage of {query - 
baseline_query}
+/// should be less than 40MB * 2.
+pub async fn validate_query_with_memory_limits(
+    expected_mem_bytes: i64,
+    mem_limit_bytes: Option<i64>,
+    query: &str,
+    baseline_query: &str,
+) {
+    if std::env::var("DATAFUSION_TEST_MEM_LIMIT_VALIDATION").is_err() {
+        println!("Skipping test because DATAFUSION_TEST_MEM_LIMIT_VALIDATION 
is not set");
+
+        return;
+    }
+
+    println!("Current process ID: {}", std::process::id());
+
+    let runtime_builder = RuntimeEnvBuilder::new();
+
+    let runtime = match mem_limit_bytes {
+        Some(mem_limit_bytes) => runtime_builder
+            .with_memory_pool(Arc::new(FairSpillPool::new(mem_limit_bytes as 
usize)))
+            .build_arc()
+            .unwrap(),
+        None => runtime_builder.build_arc().unwrap(),
+    };
+
+    let session_config = SessionConfig::new().with_target_partitions(4); // 
Make sure the configuration is the same if test is running on different machines
+
+    let ctx = SessionContext::new_with_config_rt(session_config, runtime);
+
+    let df = ctx.sql(query).await.unwrap();
+    // Run a query with 10% data to estimate the constant overhead
+    let df_small = ctx.sql(baseline_query).await.unwrap();
+
+    let (_, baseline_max_rss) =
+        measure_max_rss(|| async { df_small.collect().await.unwrap() }).await;
+
+    let (_, max_rss) = measure_max_rss(|| async { df.collect().await.unwrap() 
}).await;
+
+    println!(
+        "Memory before: {}, Memory after: {}",
+        human_readable_size(baseline_max_rss),
+        human_readable_size(max_rss)
+    );
+
+    let actual_mem_usage = max_rss as f64 - baseline_max_rss as f64;
+
+    println!(
+        "Query: {}, Memory usage: {}, Memory limit: {}",
+        query,
+        human_readable_size(actual_mem_usage as usize),
+        human_readable_size(expected_mem_bytes as usize)
+    );
+
+    assert!(
+        actual_mem_usage < expected_mem_bytes as f64,
+        "Memory usage exceeded the theoretical limit. Actual: {}, Expected 
limit: {}",
+        human_readable_size(actual_mem_usage as usize),
+        human_readable_size(expected_mem_bytes as usize)
+    );
+}
diff --git a/datafusion/core/tests/memory_limit/mod.rs 
b/datafusion/core/tests/memory_limit/mod.rs
index c7514d1c24..b6f2f8e9ac 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -16,6 +16,8 @@
 // under the License.
 
 //! This module contains tests for limiting memory at runtime in DataFusion
+#[cfg(feature = "extended_tests")]
+mod memory_limit_validation;
 
 use arrow::datatypes::{Int32Type, SchemaRef};
 use arrow::record_batch::RecordBatch;
diff --git a/parquet-testing b/parquet-testing
index e45cd23f78..f4d7ed772a 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit e45cd23f784aab3d6bf0701f8f4e621469ed3be7
+Subproject commit f4d7ed772a62a95111db50fbcad2460833e8c882
diff --git a/testing b/testing
index 98fceecd02..d2a1371230 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 98fceecd024dccd2f8a00e32fc144975f218acf4
+Subproject commit d2a13712303498963395318a4eb42872e66aead7


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to