This is an automated email from the ASF dual-hosted git repository. richox pushed a commit to branch dev-split-memmgr in repository https://gitbox.apache.org/repos/asf/auron.git
commit 5a49626d6686e26dd654302b783848ca00003961 Author: zhangli20 <[email protected]> AuthorDate: Fri Oct 24 11:58:39 2025 +0800 split memory manager to module auron-memmgr --- Cargo.lock | 34 +++++++++++++--------- Cargo.toml | 2 ++ native-engine/auron-memmgr/Cargo.toml | 20 +++++++++++++ .../src/memmgr/mod.rs => auron-memmgr/src/lib.rs} | 2 ++ .../src/memmgr => auron-memmgr/src}/metrics.rs | 0 .../src/memmgr => auron-memmgr/src}/spill.rs | 6 ++-- native-engine/auron-serde/Cargo.toml | 4 +-- native-engine/auron/Cargo.toml | 2 +- native-engine/auron/src/exec.rs | 2 +- native-engine/datafusion-ext-commons/Cargo.toml | 9 ++---- .../src/io}/ipc_compression.rs | 5 ++-- native-engine/datafusion-ext-commons/src/io/mod.rs | 1 + native-engine/datafusion-ext-exprs/Cargo.toml | 2 -- native-engine/datafusion-ext-functions/Cargo.toml | 3 +- native-engine/datafusion-ext-plans/Cargo.toml | 4 +-- native-engine/datafusion-ext-plans/src/agg/acc.rs | 7 ++--- .../datafusion-ext-plans/src/agg/agg_table.rs | 8 ++--- native-engine/datafusion-ext-plans/src/agg/avg.rs | 16 +++++----- .../datafusion-ext-plans/src/agg/bloom_filter.rs | 2 +- .../datafusion-ext-plans/src/agg/collect.rs | 4 +-- .../datafusion-ext-plans/src/agg/count.rs | 2 +- .../datafusion-ext-plans/src/agg/first.rs | 2 +- .../src/agg/spark_udaf_wrapper.rs | 2 +- native-engine/datafusion-ext-plans/src/agg_exec.rs | 6 ++-- .../src/common/execution_context.rs | 2 +- .../datafusion-ext-plans/src/common/mod.rs | 1 - .../datafusion-ext-plans/src/expand_exec.rs | 3 +- .../datafusion-ext-plans/src/ipc_reader_exec.rs | 6 ++-- .../datafusion-ext-plans/src/ipc_writer_exec.rs | 6 ++-- .../datafusion-ext-plans/src/joins/test.rs | 2 +- native-engine/datafusion-ext-plans/src/lib.rs | 3 -- .../datafusion-ext-plans/src/limit_exec.rs | 3 +- .../src/rss_shuffle_writer_exec.rs | 2 +- .../src/shuffle/buffered_data.rs | 2 +- .../src/shuffle/rss_single_repartitioner.rs | 7 ++--- .../src/shuffle/rss_sort_repartitioner.rs | 6 ++-- .../src/shuffle/single_repartitioner.rs | 6 ++-- .../src/shuffle/sort_repartitioner.rs | 8 ++--- .../src/shuffle_writer_exec.rs | 2 +- .../datafusion-ext-plans/src/sort_exec.rs | 28 +++++++++--------- 40 files changed, 120 insertions(+), 112 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 730901bb..0998d37d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -348,8 +348,8 @@ version = "0.1.0" dependencies = [ "arrow", "auron-jni-bridge", + "auron-memmgr", "auron-serde", - "bytesize", "chrono", "datafusion", "datafusion-ext-commons", @@ -381,6 +381,23 @@ dependencies = [ "paste", ] +[[package]] +name = "auron-memmgr" +version = "0.1.0" +dependencies = [ + "async-trait", + "auron-jni-bridge", + "bytesize", + "datafusion", + "datafusion-ext-commons", + "jni", + "log", + "once_cell", + "parking_lot", + "procfs", + "tempfile", +] + [[package]] name = "auron-serde" version = "0.1.0" @@ -388,12 +405,10 @@ dependencies = [ "arrow", "base64", "datafusion", - "datafusion-ext-commons", "datafusion-ext-exprs", "datafusion-ext-functions", "datafusion-ext-plans", "datafusion-spark", - "log", "object_store", "parking_lot", "prost 0.14.1", @@ -1162,26 +1177,24 @@ version = "0.1.0" dependencies = [ "arrow", "arrow-schema", - "async-trait", "auron-jni-bridge", "bigdecimal", "byteorder", - "bytes 1.10.1", "chrono", "datafusion", - "futures", "itertools 0.14.0", "jni", "log", + "lz4_flex", "num", "once_cell", "paste", "rand", "smallvec 2.0.0-alpha.11", - "tempfile", "tokio", "transpose", "unchecked-index", + "zstd", ] [[package]] @@ -1189,14 +1202,12 @@ name = "datafusion-ext-exprs" version = "0.1.0" dependencies = [ "arrow", - "async-trait", "auron-jni-bridge", "datafusion", "datafusion-ext-commons", "itertools 0.14.0", "jni", "log", - "num", "once_cell", "parking_lot", ] @@ -1206,7 +1217,6 @@ name = "datafusion-ext-functions" version = "0.1.0" dependencies = [ "arrow", - "async-trait", "auron-jni-bridge", "datafusion", "datafusion-ext-commons", @@ -1226,6 +1236,7 @@ dependencies = [ "arrow-schema", "async-trait", "auron-jni-bridge", + "auron-memmgr", "base64", "bitvec", "byteorder", @@ -1245,7 +1256,6 @@ dependencies = [ "itertools 0.14.0", "jni", "log", - "lz4_flex", "num", "object_store", "once_cell", @@ -1256,10 +1266,8 @@ dependencies = [ "procfs", "rand", "smallvec 2.0.0-alpha.11", - "tempfile", "tokio", "unchecked-index", - "zstd", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index beb2c99a..085e2feb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "native-engine/auron", "native-engine/auron-jni-bridge", "native-engine/auron-serde", + "native-engine/auron-memmgr", ] [profile.release] @@ -48,6 +49,7 @@ overflow-checks = false auron = { path = "./native-engine/auron" } auron-jni-bridge = { path = "./native-engine/auron-jni-bridge" } auron-serde = { path = "./native-engine/auron-serde" } +auron-memmgr = { path = "./native-engine/auron-memmgr" } datafusion-ext-commons = { path = "./native-engine/datafusion-ext-commons" } datafusion-ext-exprs = { path = "./native-engine/datafusion-ext-exprs" } datafusion-ext-functions = { path = "./native-engine/datafusion-ext-functions" } diff --git a/native-engine/auron-memmgr/Cargo.toml b/native-engine/auron-memmgr/Cargo.toml new file mode 100644 index 00000000..a5b6f4c1 --- /dev/null +++ b/native-engine/auron-memmgr/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "auron-memmgr" +version = "0.1.0" +edition = "2024" + +[dependencies] +auron-jni-bridge = { workspace = true } +datafusion = { workspace = true } +datafusion-ext-commons = { workspace = true } + +async-trait = { workspace = true } +bytesize = { workspace = true } +jni = { workspace = true } +log = { workspace = true } +once_cell = { workspace = true } +tempfile = { workspace = true } +parking_lot = { workspace = true } + +[target.'cfg(target_os = "linux")'.dependencies] +procfs = { workspace = true } diff --git a/native-engine/datafusion-ext-plans/src/memmgr/mod.rs b/native-engine/auron-memmgr/src/lib.rs similarity index 99% rename from native-engine/datafusion-ext-plans/src/memmgr/mod.rs rename to native-engine/auron-memmgr/src/lib.rs index cb28f09c..e9cb9031 100644 --- a/native-engine/datafusion-ext-plans/src/memmgr/mod.rs +++ b/native-engine/auron-memmgr/src/lib.rs @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(get_mut_unchecked)] + pub mod metrics; pub mod spill; diff --git a/native-engine/datafusion-ext-plans/src/memmgr/metrics.rs b/native-engine/auron-memmgr/src/metrics.rs similarity index 100% rename from native-engine/datafusion-ext-plans/src/memmgr/metrics.rs rename to native-engine/auron-memmgr/src/metrics.rs diff --git a/native-engine/datafusion-ext-plans/src/memmgr/spill.rs b/native-engine/auron-memmgr/src/spill.rs similarity index 98% rename from native-engine/datafusion-ext-plans/src/memmgr/spill.rs rename to native-engine/auron-memmgr/src/spill.rs index e3b8be07..3e7ef0b4 100644 --- a/native-engine/datafusion-ext-plans/src/memmgr/spill.rs +++ b/native-engine/auron-memmgr/src/spill.rs @@ -27,14 +27,12 @@ use auron_jni_bridge::{ jni_get_string, jni_new_direct_byte_buffer, jni_new_global_ref, }; use datafusion::{common::Result, parquet::file::reader::Length, physical_plan::metrics::Time}; +use datafusion_ext_commons::io::ipc_compression::{IoCompressionReader, IoCompressionWriter}; use jni::{objects::GlobalRef, sys::jlong}; use log::warn; use once_cell::sync::OnceCell; -use crate::{ - common::ipc_compression::{IoCompressionReader, IoCompressionWriter}, - memmgr::metrics::SpillMetrics, -}; +use crate::metrics::SpillMetrics; pub type SpillCompressedReader<'a> = IoCompressionReader<BufReader<Box<dyn Read + Send + 'a>>>; pub type SpillCompressedWriter<'a> = IoCompressionWriter<BufWriter<Box<dyn Write + Send + 'a>>>; diff --git a/native-engine/auron-serde/Cargo.toml b/native-engine/auron-serde/Cargo.toml index 12037e03..adb63dbf 100644 --- a/native-engine/auron-serde/Cargo.toml +++ b/native-engine/auron-serde/Cargo.toml @@ -26,17 +26,15 @@ default = ["prost/no-recursion-limit"] [dependencies] arrow = { workspace = true } datafusion = { workspace = true } -datafusion-ext-commons = { workspace = true } datafusion-ext-exprs = { workspace = true } datafusion-ext-functions = { workspace = true } datafusion-ext-plans = { workspace = true } datafusion-spark = { workspace = true } base64 = { workspace = true } -log = { workspace = true } object_store = { workspace = true } -prost = { workspace = true } parking_lot = { workspace = true } +prost = { workspace = true } [build-dependencies] tonic-build = { workspace = true } diff --git a/native-engine/auron/Cargo.toml b/native-engine/auron/Cargo.toml index 0afd6dbc..9e3f1353 100644 --- a/native-engine/auron/Cargo.toml +++ b/native-engine/auron/Cargo.toml @@ -33,12 +33,12 @@ http-service = [] [dependencies] arrow = { workspace = true } auron-jni-bridge = { workspace = true } +auron-memmgr = { workspace = true } auron-serde = { workspace = true } datafusion = { workspace = true } datafusion-ext-commons = { workspace = true } datafusion-ext-plans = { workspace = true } -bytesize = { workspace = true } futures = { workspace = true } jni = { workspace = true } log = { workspace = true } diff --git a/native-engine/auron/src/exec.rs b/native-engine/auron/src/exec.rs index d8e4bdf8..5a6eb43c 100644 --- a/native-engine/auron/src/exec.rs +++ b/native-engine/auron/src/exec.rs @@ -18,6 +18,7 @@ use auron_jni_bridge::{ jni_bridge::JavaClasses, *, }; +use auron_memmgr::MemManager; use datafusion::{ common::Result, error::DataFusionError, @@ -28,7 +29,6 @@ use datafusion::{ }, prelude::{SessionConfig, SessionContext}, }; -use datafusion_ext_plans::memmgr::MemManager; use jni::{ JNIEnv, objects::{JClass, JObject, JString}, diff --git a/native-engine/datafusion-ext-commons/Cargo.toml b/native-engine/datafusion-ext-commons/Cargo.toml index 3dc0c176..7ca7fe6a 100644 --- a/native-engine/datafusion-ext-commons/Cargo.toml +++ b/native-engine/datafusion-ext-commons/Cargo.toml @@ -29,24 +29,21 @@ arrow = { workspace = true } arrow-schema = { workspace = true } auron-jni-bridge = { workspace = true } datafusion = { workspace = true } - -async-trait = { workspace = true } bigdecimal = { workspace = true } byteorder = { workspace = true } -bytes = { workspace = true } chrono = { workspace = true } -futures = { workspace = true } itertools = { workspace = true } jni = { workspace = true } log = { workspace = true } +lz4_flex = { workspace = true } num = { workspace = true } once_cell = { workspace = true } paste = { workspace = true } smallvec = { workspace = true } -tempfile = { workspace = true } -transpose = { workspace = true } tokio = { workspace = true } +transpose = { workspace = true } unchecked-index = { workspace = true } +zstd = { workspace = true } [dev-dependencies] rand = { workspace = true } diff --git a/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs b/native-engine/datafusion-ext-commons/src/io/ipc_compression.rs similarity index 99% rename from native-engine/datafusion-ext-plans/src/common/ipc_compression.rs rename to native-engine/datafusion-ext-commons/src/io/ipc_compression.rs index 6f0ebcff..d3ea0b29 100644 --- a/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs +++ b/native-engine/datafusion-ext-commons/src/io/ipc_compression.rs @@ -25,11 +25,12 @@ use auron_jni_bridge::{ }; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use datafusion::common::Result; -use datafusion_ext_commons::{ +use once_cell::sync::OnceCell; + +use crate::{ df_execution_err, io::{read_one_batch, write_one_batch}, }; -use once_cell::sync::OnceCell; pub struct IpcCompressionWriter<W: Write> { output: W, diff --git a/native-engine/datafusion-ext-commons/src/io/mod.rs b/native-engine/datafusion-ext-commons/src/io/mod.rs index 243ee57d..a8160174 100644 --- a/native-engine/datafusion-ext-commons/src/io/mod.rs +++ b/native-engine/datafusion-ext-commons/src/io/mod.rs @@ -27,6 +27,7 @@ pub use scalar_serde::{read_scalar, write_scalar}; use crate::{UninitializedInit, arrow::cast::cast}; mod batch_serde; +pub mod ipc_compression; mod scalar_serde; pub fn write_one_batch(num_rows: usize, cols: &[ArrayRef], mut output: impl Write) -> Result<()> { diff --git a/native-engine/datafusion-ext-exprs/Cargo.toml b/native-engine/datafusion-ext-exprs/Cargo.toml index f9d60fc2..4387bbdf 100644 --- a/native-engine/datafusion-ext-exprs/Cargo.toml +++ b/native-engine/datafusion-ext-exprs/Cargo.toml @@ -23,7 +23,6 @@ resolver = "1" [dependencies] arrow = { workspace = true } -async-trait = "0.1.89" auron-jni-bridge = { workspace = true } datafusion = { workspace = true } datafusion-ext-commons = { workspace = true } @@ -31,6 +30,5 @@ datafusion-ext-commons = { workspace = true } itertools = { workspace = true } jni = { workspace = true } log = { workspace = true } -num = { workspace = true } once_cell = { workspace = true } parking_lot = { workspace = true } diff --git a/native-engine/datafusion-ext-functions/Cargo.toml b/native-engine/datafusion-ext-functions/Cargo.toml index 495e4c7a..98de05f8 100644 --- a/native-engine/datafusion-ext-functions/Cargo.toml +++ b/native-engine/datafusion-ext-functions/Cargo.toml @@ -23,7 +23,6 @@ resolver = "1" [dependencies] arrow = { workspace = true } -async-trait = "0.1.89" auron-jni-bridge = { workspace = true } datafusion = { workspace = true } datafusion-ext-commons = { workspace = true } @@ -33,4 +32,4 @@ log = { workspace = true } num = { workspace = true } paste = { workspace = true } serde_json = { workspace = true } -sonic-rs = { workspace = true } \ No newline at end of file +sonic-rs = { workspace = true } diff --git a/native-engine/datafusion-ext-plans/Cargo.toml b/native-engine/datafusion-ext-plans/Cargo.toml index d452ffe0..e6dff419 100644 --- a/native-engine/datafusion-ext-plans/Cargo.toml +++ b/native-engine/datafusion-ext-plans/Cargo.toml @@ -28,6 +28,7 @@ default = ["tokio/rt-multi-thread"] arrow = { workspace = true } arrow-schema = { workspace = true } auron-jni-bridge = { workspace = true } +auron-memmgr = { workspace = true } datafusion = { workspace = true } datafusion-datasource = { workspace = true } datafusion-datasource-parquet = { workspace = true } @@ -50,7 +51,6 @@ hashbrown = { workspace = true } itertools = { workspace = true } jni = { workspace = true } log = { workspace = true } -lz4_flex = { workspace = true } num = { workspace = true } object_store = { workspace = true } once_cell = { workspace = true } @@ -58,10 +58,8 @@ panic-message = { workspace = true } parking_lot = { workspace = true } paste = { workspace = true } smallvec = { workspace = true } -tempfile = { workspace = true } tokio = { workspace = true } unchecked-index = { workspace = true } -zstd = { workspace = true } [target.'cfg(target_os = "linux")'.dependencies] procfs = { workspace = true } diff --git a/native-engine/datafusion-ext-plans/src/agg/acc.rs b/native-engine/datafusion-ext-plans/src/agg/acc.rs index 1292c0c8..d3ce8049 100644 --- a/native-engine/datafusion-ext-plans/src/agg/acc.rs +++ b/native-engine/datafusion-ext-plans/src/agg/acc.rs @@ -23,6 +23,7 @@ use arrow::{ array::*, datatypes::{DataType, *}, }; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use bitvec::{bitvec, vec::BitVec}; use byteorder::{ReadBytesExt, WriteBytesExt}; use datafusion::common::{Result, ScalarValue, utils::proxy::VecAllocExt}; @@ -33,11 +34,7 @@ use datafusion_ext_commons::{ }; use smallvec::SmallVec; -use crate::{ - agg::agg::IdxSelection, - idx_for, idx_with_iter, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, -}; +use crate::{agg::agg::IdxSelection, idx_for, idx_with_iter}; pub trait AccColumn: Send { fn as_any(&self) -> &dyn Any; diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs index d2adb51b..2e433391 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs @@ -21,6 +21,10 @@ use std::{ use arrow::{record_batch::RecordBatch, row::Rows}; use async_trait::async_trait; +use auron_memmgr::{ + MemConsumer, MemConsumerInfo, MemManager, + spill::{Spill, SpillCompressedReader, SpillCompressedWriter, try_new_spill}, +}; use bytesize::ByteSize; use datafusion::{ common::{DataFusionError, Result}, @@ -52,10 +56,6 @@ use crate::{ execution_context::{ExecutionContext, WrappedRecordBatchSender}, timer_helper::TimerHelper, }, - memmgr::{ - MemConsumer, MemConsumerInfo, MemManager, - spill::{Spill, SpillCompressedReader, SpillCompressedWriter, try_new_spill}, - }, }; pub type OwnedKey = SmallVec<u8, 24>; diff --git a/native-engine/datafusion-ext-plans/src/agg/avg.rs b/native-engine/datafusion-ext-plans/src/agg/avg.rs index 7512b774..008dd34b 100644 --- a/native-engine/datafusion-ext-plans/src/agg/avg.rs +++ b/native-engine/datafusion-ext-plans/src/agg/avg.rs @@ -21,6 +21,7 @@ use std::{ }; use arrow::{array::*, datatypes::*}; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use datafusion::{ common::{ Result, @@ -30,15 +31,12 @@ use datafusion::{ }; use datafusion_ext_commons::downcast_any; -use crate::{ - agg::{ - Agg, - acc::{AccColumn, AccColumnRef}, - agg::IdxSelection, - count::AggCount, - sum::AggSum, - }, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, +use crate::agg::{ + Agg, + acc::{AccColumn, AccColumnRef}, + agg::IdxSelection, + count::AggCount, + sum::AggSum, }; pub struct AggAvg { diff --git a/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs b/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs index c7b4dd9f..7c6f1da3 100644 --- a/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs +++ b/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs @@ -24,6 +24,7 @@ use arrow::{ array::{ArrayRef, AsArray, BinaryBuilder}, datatypes::{DataType, Int64Type}, }; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use byteorder::{ReadBytesExt, WriteBytesExt}; use datafusion::{common::Result, physical_expr::PhysicalExprRef}; use datafusion_ext_commons::{ @@ -37,7 +38,6 @@ use crate::{ agg::IdxSelection, }, idx_for, idx_for_zipped, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, }; pub struct AggBloomFilter { diff --git a/native-engine/datafusion-ext-plans/src/agg/collect.rs b/native-engine/datafusion-ext-plans/src/agg/collect.rs index 0472eede..6ddeed83 100644 --- a/native-engine/datafusion-ext-plans/src/agg/collect.rs +++ b/native-engine/datafusion-ext-plans/src/agg/collect.rs @@ -23,6 +23,7 @@ use std::{ }; use arrow::{array::*, datatypes::*}; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use datafusion::{ common::{Result, ScalarValue}, physical_expr::PhysicalExprRef, @@ -41,7 +42,6 @@ use crate::{ agg::{Agg, IdxSelection}, }, idx_for, idx_for_zipped, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, }; pub type AggCollectSet = AggGenericCollect<AccSetColumn>; @@ -650,10 +650,10 @@ fn acc_hash(value: impl AsRef<[u8]>) -> u64 { #[cfg(test)] mod tests { use arrow::datatypes::DataType; + use auron_memmgr::spill::Spill; use datafusion::common::ScalarValue; use super::*; - use crate::memmgr::spill::Spill; #[test] fn test_acc_set_append() { diff --git a/native-engine/datafusion-ext-plans/src/agg/count.rs b/native-engine/datafusion-ext-plans/src/agg/count.rs index 253bcf58..ab17c5d3 100644 --- a/native-engine/datafusion-ext-plans/src/agg/count.rs +++ b/native-engine/datafusion-ext-plans/src/agg/count.rs @@ -21,6 +21,7 @@ use std::{ }; use arrow::{array::*, datatypes::*}; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use datafusion::{common::Result, physical_expr::PhysicalExprRef}; use datafusion_ext_commons::{ downcast_any, @@ -33,7 +34,6 @@ use crate::{ agg::{Agg, IdxSelection}, }, idx_for, idx_for_zipped, idx_with_iter, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, }; pub struct AggCount { diff --git a/native-engine/datafusion-ext-plans/src/agg/first.rs b/native-engine/datafusion-ext-plans/src/agg/first.rs index 494631c9..b4bbc115 100644 --- a/native-engine/datafusion-ext-plans/src/agg/first.rs +++ b/native-engine/datafusion-ext-plans/src/agg/first.rs @@ -21,6 +21,7 @@ use std::{ }; use arrow::{array::*, datatypes::*}; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use datafusion::{ common::{Result, ScalarValue}, physical_expr::PhysicalExprRef, @@ -37,7 +38,6 @@ use crate::{ agg::IdxSelection, }, idx_for_zipped, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, }; pub struct AggFirst { diff --git a/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs b/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs index 2eda7766..deba5d34 100644 --- a/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs +++ b/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs @@ -30,6 +30,7 @@ use auron_jni_bridge::{ jni_bridge::LocalRef, jni_call, jni_get_byte_array_len, jni_get_byte_array_region, jni_new_direct_byte_buffer, jni_new_global_ref, jni_new_object, jni_new_prim_array, }; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use datafusion::{ common::{DataFusionError, Result}, physical_expr::PhysicalExprRef, @@ -47,7 +48,6 @@ use crate::{ agg::{Agg, IdxSelection}, }, idx_for_zipped, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, }; pub struct SparkUDAFWrapper { diff --git a/native-engine/datafusion-ext-plans/src/agg_exec.rs b/native-engine/datafusion-ext-plans/src/agg_exec.rs index 80c828dc..a77babd0 100644 --- a/native-engine/datafusion-ext-plans/src/agg_exec.rs +++ b/native-engine/datafusion-ext-plans/src/agg_exec.rs @@ -24,6 +24,7 @@ use arrow::{ datatypes::SchemaRef, }; use auron_jni_bridge::conf::{IntConf, UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG}; +use auron_memmgr::MemManager; use datafusion::{ common::{Result, Statistics}, error::DataFusionError, @@ -50,7 +51,6 @@ use crate::{ }, common::{execution_context::ExecutionContext, timer_helper::TimerHelper}, expand_exec::ExpandExec, - memmgr::MemManager, project_exec::ProjectExec, sort_exec::create_default_ascending_sort_exec, }; @@ -418,6 +418,7 @@ mod test { datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ assert_batches_sorted_eq, common::{Result, ScalarValue}, @@ -435,7 +436,6 @@ mod test { agg::create_agg, }, agg_exec::AggExec, - memmgr::MemManager, }; fn build_table_i32( @@ -691,6 +691,7 @@ mod fuzztest { datatypes::{DataType, Float64Type, Int64Type}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ common::Result, physical_expr::expressions as phys_expr, @@ -708,7 +709,6 @@ mod fuzztest { sum::AggSum, }, agg_exec::AggExec, - memmgr::MemManager, }; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] diff --git a/native-engine/datafusion-ext-plans/src/common/execution_context.rs b/native-engine/datafusion-ext-plans/src/common/execution_context.rs index b79b1381..15b6e69d 100644 --- a/native-engine/datafusion-ext-plans/src/common/execution_context.rs +++ b/native-engine/datafusion-ext-plans/src/common/execution_context.rs @@ -30,6 +30,7 @@ use arrow::{ }; use arrow_schema::Schema; use auron_jni_bridge::{conf, conf::BooleanConf, is_task_running}; +use auron_memmgr::metrics::SpillMetrics; use datafusion::{ common::{DataFusionError, Result}, execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}, @@ -62,7 +63,6 @@ use crate::{ }, timer_helper::TimerHelper, }, - memmgr::metrics::SpillMetrics, sort_exec::SortExec, }; diff --git a/native-engine/datafusion-ext-plans/src/common/mod.rs b/native-engine/datafusion-ext-plans/src/common/mod.rs index 2bbe09fc..def20537 100644 --- a/native-engine/datafusion-ext-plans/src/common/mod.rs +++ b/native-engine/datafusion-ext-plans/src/common/mod.rs @@ -16,7 +16,6 @@ pub mod cached_exprs_evaluator; pub mod column_pruning; pub mod execution_context; -pub mod ipc_compression; pub mod key_rows_output; pub mod offsetted; pub mod row_null_checker; diff --git a/native-engine/datafusion-ext-plans/src/expand_exec.rs b/native-engine/datafusion-ext-plans/src/expand_exec.rs index 8521eddc..9030e3cd 100644 --- a/native-engine/datafusion-ext-plans/src/expand_exec.rs +++ b/native-engine/datafusion-ext-plans/src/expand_exec.rs @@ -195,6 +195,7 @@ mod test { datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ assert_batches_eq, common::{Result, ScalarValue}, @@ -204,7 +205,7 @@ mod test { prelude::SessionContext, }; - use crate::{expand_exec::ExpandExec, memmgr::MemManager}; + use crate::expand_exec::ExpandExec; // build i32 table fn build_table_i32(a: (&str, &Vec<i32>)) -> RecordBatch { diff --git a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs index a1e391d8..255e0e1f 100644 --- a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs +++ b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs @@ -51,13 +51,15 @@ use datafusion_ext_commons::{ array_size::{ArraySize, BatchSize}, coalesce::coalesce_arrays_unchecked, }, - batch_size, df_execution_err, suggested_batch_mem_size, + batch_size, df_execution_err, + io::ipc_compression::IpcCompressionReader, + suggested_batch_mem_size, }; use jni::objects::{GlobalRef, JObject}; use once_cell::sync::OnceCell; use parking_lot::Mutex; -use crate::common::{execution_context::ExecutionContext, ipc_compression::IpcCompressionReader}; +use crate::common::execution_context::ExecutionContext; #[derive(Debug, Clone)] pub struct IpcReaderExec { diff --git a/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs index a28dfc77..7c840fd7 100644 --- a/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs +++ b/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs @@ -32,14 +32,12 @@ use datafusion::{ stream::RecordBatchStreamAdapter, }, }; +use datafusion_ext_commons::io::ipc_compression::IpcCompressionWriter; use futures::{StreamExt, TryStreamExt, stream::once}; use jni::objects::{GlobalRef, JObject}; use once_cell::sync::OnceCell; -use crate::common::{ - execution_context::ExecutionContext, ipc_compression::IpcCompressionWriter, - timer_helper::TimerHelper, -}; +use crate::common::{execution_context::ExecutionContext, timer_helper::TimerHelper}; #[derive(Debug)] pub struct IpcWriterExec { diff --git a/native-engine/datafusion-ext-plans/src/joins/test.rs b/native-engine/datafusion-ext-plans/src/joins/test.rs index 06d2ad59..9125ed53 100644 --- a/native-engine/datafusion-ext-plans/src/joins/test.rs +++ b/native-engine/datafusion-ext-plans/src/joins/test.rs @@ -25,6 +25,7 @@ mod tests { datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ assert_batches_sorted_eq, common::JoinSide, @@ -38,7 +39,6 @@ mod tests { broadcast_join_build_hash_map_exec::BroadcastJoinBuildHashMapExec, broadcast_join_exec::BroadcastJoinExec, joins::join_utils::{JoinType, JoinType::*}, - memmgr::MemManager, sort_merge_join_exec::SortMergeJoinExec, }; diff --git a/native-engine/datafusion-ext-plans/src/lib.rs b/native-engine/datafusion-ext-plans/src/lib.rs index 6d510ce2..f85f3b26 100644 --- a/native-engine/datafusion-ext-plans/src/lib.rs +++ b/native-engine/datafusion-ext-plans/src/lib.rs @@ -48,9 +48,6 @@ pub mod sort_merge_join_exec; pub mod union_exec; pub mod window_exec; -// memory management -pub mod memmgr; - // helper modules pub mod common; pub mod generate; diff --git a/native-engine/datafusion-ext-plans/src/limit_exec.rs b/native-engine/datafusion-ext-plans/src/limit_exec.rs index 81110456..1b9f5892 100644 --- a/native-engine/datafusion-ext-plans/src/limit_exec.rs +++ b/native-engine/datafusion-ext-plans/src/limit_exec.rs @@ -153,6 +153,7 @@ mod test { datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ assert_batches_eq, common::{Result, stats::Precision}, @@ -160,7 +161,7 @@ mod test { prelude::SessionContext, }; - use crate::{limit_exec::LimitExec, memmgr::MemManager}; + use crate::limit_exec::LimitExec; fn build_table_i32( a: (&str, &Vec<i32>), diff --git a/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs b/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs index 22c9687f..bad86413 100644 --- a/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs +++ b/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs @@ -19,6 +19,7 @@ use std::{any::Any, fmt::Debug, sync::Arc}; use async_trait::async_trait; use auron_jni_bridge::{jni_call_static, jni_new_global_ref, jni_new_string}; +use auron_memmgr::MemManager; use datafusion::{ arrow::datatypes::SchemaRef, error::{DataFusionError, Result}, @@ -36,7 +37,6 @@ use once_cell::sync::OnceCell; use crate::{ common::execution_context::ExecutionContext, - memmgr::MemManager, shuffle::{ Partitioning, ShuffleRepartitioner, rss_single_repartitioner::RssSingleShuffleRepartitioner, diff --git a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs index d6df4f19..6b88ba8f 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs @@ -27,6 +27,7 @@ use datafusion_ext_commons::{ selection::{BatchInterleaver, create_batch_interleaver}, }, compute_suggested_batch_size_for_output, df_execution_err, + io::ipc_compression::IpcCompressionWriter, }; use itertools::Itertools; use jni::objects::GlobalRef; @@ -35,7 +36,6 @@ use parking_lot::Mutex; use crate::{ common::{ - ipc_compression::IpcCompressionWriter, offsetted::{Offsetted, OffsettedMergeIterator}, timer_helper::TimerHelper, }, diff --git a/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs index ca7ee73d..a41f5516 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs @@ -17,14 +17,11 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion::{arrow::record_batch::RecordBatch, common::Result, physical_plan::metrics::Time}; -use datafusion_ext_commons::df_execution_err; +use datafusion_ext_commons::{df_execution_err, io::ipc_compression::IpcCompressionWriter}; use jni::objects::GlobalRef; use parking_lot::Mutex; -use crate::{ - common::ipc_compression::IpcCompressionWriter, - shuffle::{ShuffleRepartitioner, rss::RssWriter}, -}; +use crate::shuffle::{ShuffleRepartitioner, rss::RssWriter}; pub struct RssSingleShuffleRepartitioner { rss_partition_writer: Arc<Mutex<IpcCompressionWriter<RssWriter>>>, diff --git a/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs index 58d3ed0d..b0b2c825 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs @@ -17,15 +17,13 @@ use std::sync::Weak; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use auron_memmgr::{MemConsumer, MemConsumerInfo, MemManager}; use datafusion::{common::Result, physical_plan::metrics::Time}; use datafusion_ext_commons::arrow::array_size::BatchSize; use futures::lock::Mutex; use jni::objects::GlobalRef; -use crate::{ - memmgr::{MemConsumer, MemConsumerInfo, MemManager}, - shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData}, -}; +use crate::shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData}; pub struct RssSortShuffleRepartitioner { mem_consumer_info: Option<Weak<MemConsumerInfo>>, diff --git a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs index bfe87512..d5f4f31d 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs @@ -22,13 +22,11 @@ use std::{ use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::{common::Result, physical_plan::metrics::Time}; +use datafusion_ext_commons::io::ipc_compression::IpcCompressionWriter; use tokio::sync::Mutex; use crate::{ - common::{ - ipc_compression::IpcCompressionWriter, - timer_helper::{TimedWriter, TimerHelper}, - }, + common::timer_helper::{TimedWriter, TimerHelper}, shuffle::{ShuffleRepartitioner, open_shuffle_file}, }; diff --git a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs index 169ad212..b76ed631 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs @@ -20,6 +20,10 @@ use std::{ use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use auron_memmgr::{ + MemConsumer, MemConsumerInfo, MemManager, + spill::{OwnedSpillBufReader, Spill, try_new_spill}, +}; use bytesize::ByteSize; use datafusion::{ common::{DataFusionError, Result}, @@ -34,10 +38,6 @@ use crate::{ offsetted::{Offsetted, OffsettedMergeIterator}, timer_helper::TimerHelper, }, - memmgr::{ - MemConsumer, MemConsumerInfo, MemManager, - spill::{OwnedSpillBufReader, Spill, try_new_spill}, - }, shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData, open_shuffle_file}, }; diff --git a/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs b/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs index afa16fd4..32c2dc14 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs @@ -19,6 +19,7 @@ use std::{any::Any, fmt::Debug, sync::Arc}; use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use auron_memmgr::MemManager; use datafusion::{ error::Result, execution::context::TaskContext, @@ -36,7 +37,6 @@ use once_cell::sync::OnceCell; use crate::{ common::execution_context::ExecutionContext, - memmgr::MemManager, shuffle::{ Partitioning, ShuffleRepartitioner, single_repartitioner::SingleShuffleRepartitioner, sort_repartitioner::SortShuffleRepartitioner, diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs b/native-engine/datafusion-ext-plans/src/sort_exec.rs index 2a201cdf..24d0beb6 100644 --- a/native-engine/datafusion-ext-plans/src/sort_exec.rs +++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs @@ -33,6 +33,10 @@ use arrow::{ row::{RowConverter, Rows, SortField}, }; use async_trait::async_trait; +use auron_memmgr::{ + MemConsumer, MemConsumerInfo, MemManager, + spill::{Spill, SpillCompressedReader, SpillCompressedWriter, try_new_spill}, +}; use bytesize::ByteSize; use datafusion::{ common::{DataFusionError, Result, Statistics, utils::proxy::VecAllocExt}, @@ -62,19 +66,13 @@ use itertools::Itertools; use once_cell::sync::OnceCell; use parking_lot::Mutex; -use crate::{ - common::{ - execution_context::{ - ExecutionContext, WrappedRecordBatchSender, WrappedRecordBatchWithKeyRowsSender, - WrappedSenderTrait, - }, - key_rows_output::{RecordBatchWithKeyRows, SendableRecordBatchWithKeyRowsStream}, - timer_helper::TimerHelper, - }, - memmgr::{ - MemConsumer, MemConsumerInfo, MemManager, - spill::{Spill, SpillCompressedReader, SpillCompressedWriter, try_new_spill}, +use crate::common::{ + execution_context::{ + ExecutionContext, WrappedRecordBatchSender, WrappedRecordBatchWithKeyRowsSender, + WrappedSenderTrait, }, + key_rows_output::{RecordBatchWithKeyRows, SendableRecordBatchWithKeyRowsStream}, + timer_helper::TimerHelper, }; // reserve memory for each spill @@ -1412,6 +1410,7 @@ mod test { datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ assert_batches_eq, common::Result, @@ -1420,7 +1419,7 @@ mod test { prelude::SessionContext, }; - use crate::{memmgr::MemManager, sort_exec::SortExec}; + use crate::sort_exec::SortExec; fn build_table_i32( a: (&str, &Vec<i32>), @@ -1499,6 +1498,7 @@ mod fuzztest { compute::{SortOptions, concat_batches}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ common::{Result, stats::Precision}, physical_expr::{LexOrdering, PhysicalSortExpr, expressions::Column}, @@ -1506,7 +1506,7 @@ mod fuzztest { prelude::{SessionConfig, SessionContext}, }; - use crate::{memmgr::MemManager, sort_exec::SortExec}; + use crate::sort_exec::SortExec; #[tokio::test] async fn fuzztest_in_mem_sorting() -> Result<()> {
