This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch dev-split-memmgr
in repository https://gitbox.apache.org/repos/asf/auron.git

commit 5a49626d6686e26dd654302b783848ca00003961
Author: zhangli20 <[email protected]>
AuthorDate: Fri Oct 24 11:58:39 2025 +0800

    split memory manager to module auron-memmgr
---
 Cargo.lock                                         | 34 +++++++++++++---------
 Cargo.toml                                         |  2 ++
 native-engine/auron-memmgr/Cargo.toml              | 20 +++++++++++++
 .../src/memmgr/mod.rs => auron-memmgr/src/lib.rs}  |  2 ++
 .../src/memmgr => auron-memmgr/src}/metrics.rs     |  0
 .../src/memmgr => auron-memmgr/src}/spill.rs       |  6 ++--
 native-engine/auron-serde/Cargo.toml               |  4 +--
 native-engine/auron/Cargo.toml                     |  2 +-
 native-engine/auron/src/exec.rs                    |  2 +-
 native-engine/datafusion-ext-commons/Cargo.toml    |  9 ++----
 .../src/io}/ipc_compression.rs                     |  5 ++--
 native-engine/datafusion-ext-commons/src/io/mod.rs |  1 +
 native-engine/datafusion-ext-exprs/Cargo.toml      |  2 --
 native-engine/datafusion-ext-functions/Cargo.toml  |  3 +-
 native-engine/datafusion-ext-plans/Cargo.toml      |  4 +--
 native-engine/datafusion-ext-plans/src/agg/acc.rs  |  7 ++---
 .../datafusion-ext-plans/src/agg/agg_table.rs      |  8 ++---
 native-engine/datafusion-ext-plans/src/agg/avg.rs  | 16 +++++-----
 .../datafusion-ext-plans/src/agg/bloom_filter.rs   |  2 +-
 .../datafusion-ext-plans/src/agg/collect.rs        |  4 +--
 .../datafusion-ext-plans/src/agg/count.rs          |  2 +-
 .../datafusion-ext-plans/src/agg/first.rs          |  2 +-
 .../src/agg/spark_udaf_wrapper.rs                  |  2 +-
 native-engine/datafusion-ext-plans/src/agg_exec.rs |  6 ++--
 .../src/common/execution_context.rs                |  2 +-
 .../datafusion-ext-plans/src/common/mod.rs         |  1 -
 .../datafusion-ext-plans/src/expand_exec.rs        |  3 +-
 .../datafusion-ext-plans/src/ipc_reader_exec.rs    |  6 ++--
 .../datafusion-ext-plans/src/ipc_writer_exec.rs    |  6 ++--
 .../datafusion-ext-plans/src/joins/test.rs         |  2 +-
 native-engine/datafusion-ext-plans/src/lib.rs      |  3 --
 .../datafusion-ext-plans/src/limit_exec.rs         |  3 +-
 .../src/rss_shuffle_writer_exec.rs                 |  2 +-
 .../src/shuffle/buffered_data.rs                   |  2 +-
 .../src/shuffle/rss_single_repartitioner.rs        |  7 ++---
 .../src/shuffle/rss_sort_repartitioner.rs          |  6 ++--
 .../src/shuffle/single_repartitioner.rs            |  6 ++--
 .../src/shuffle/sort_repartitioner.rs              |  8 ++---
 .../src/shuffle_writer_exec.rs                     |  2 +-
 .../datafusion-ext-plans/src/sort_exec.rs          | 28 +++++++++---------
 40 files changed, 120 insertions(+), 112 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 730901bb..0998d37d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -348,8 +348,8 @@ version = "0.1.0"
 dependencies = [
  "arrow",
  "auron-jni-bridge",
+ "auron-memmgr",
  "auron-serde",
- "bytesize",
  "chrono",
  "datafusion",
  "datafusion-ext-commons",
@@ -381,6 +381,23 @@ dependencies = [
  "paste",
 ]
 
+[[package]]
+name = "auron-memmgr"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "auron-jni-bridge",
+ "bytesize",
+ "datafusion",
+ "datafusion-ext-commons",
+ "jni",
+ "log",
+ "once_cell",
+ "parking_lot",
+ "procfs",
+ "tempfile",
+]
+
 [[package]]
 name = "auron-serde"
 version = "0.1.0"
@@ -388,12 +405,10 @@ dependencies = [
  "arrow",
  "base64",
  "datafusion",
- "datafusion-ext-commons",
  "datafusion-ext-exprs",
  "datafusion-ext-functions",
  "datafusion-ext-plans",
  "datafusion-spark",
- "log",
  "object_store",
  "parking_lot",
  "prost 0.14.1",
@@ -1162,26 +1177,24 @@ version = "0.1.0"
 dependencies = [
  "arrow",
  "arrow-schema",
- "async-trait",
  "auron-jni-bridge",
  "bigdecimal",
  "byteorder",
- "bytes 1.10.1",
  "chrono",
  "datafusion",
- "futures",
  "itertools 0.14.0",
  "jni",
  "log",
+ "lz4_flex",
  "num",
  "once_cell",
  "paste",
  "rand",
  "smallvec 2.0.0-alpha.11",
- "tempfile",
  "tokio",
  "transpose",
  "unchecked-index",
+ "zstd",
 ]
 
 [[package]]
@@ -1189,14 +1202,12 @@ name = "datafusion-ext-exprs"
 version = "0.1.0"
 dependencies = [
  "arrow",
- "async-trait",
  "auron-jni-bridge",
  "datafusion",
  "datafusion-ext-commons",
  "itertools 0.14.0",
  "jni",
  "log",
- "num",
  "once_cell",
  "parking_lot",
 ]
@@ -1206,7 +1217,6 @@ name = "datafusion-ext-functions"
 version = "0.1.0"
 dependencies = [
  "arrow",
- "async-trait",
  "auron-jni-bridge",
  "datafusion",
  "datafusion-ext-commons",
@@ -1226,6 +1236,7 @@ dependencies = [
  "arrow-schema",
  "async-trait",
  "auron-jni-bridge",
+ "auron-memmgr",
  "base64",
  "bitvec",
  "byteorder",
@@ -1245,7 +1256,6 @@ dependencies = [
  "itertools 0.14.0",
  "jni",
  "log",
- "lz4_flex",
  "num",
  "object_store",
  "once_cell",
@@ -1256,10 +1266,8 @@ dependencies = [
  "procfs",
  "rand",
  "smallvec 2.0.0-alpha.11",
- "tempfile",
  "tokio",
  "unchecked-index",
- "zstd",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index beb2c99a..085e2feb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -24,6 +24,7 @@ members = [
     "native-engine/auron",
     "native-engine/auron-jni-bridge",
     "native-engine/auron-serde",
+    "native-engine/auron-memmgr",
 ]
 
 [profile.release]
@@ -48,6 +49,7 @@ overflow-checks = false
 auron = { path = "./native-engine/auron" }
 auron-jni-bridge = { path = "./native-engine/auron-jni-bridge" }
 auron-serde = { path = "./native-engine/auron-serde" }
+auron-memmgr = { path = "./native-engine/auron-memmgr" }
 datafusion-ext-commons = { path = "./native-engine/datafusion-ext-commons" }
 datafusion-ext-exprs = { path = "./native-engine/datafusion-ext-exprs" }
 datafusion-ext-functions = { path = "./native-engine/datafusion-ext-functions" 
}
diff --git a/native-engine/auron-memmgr/Cargo.toml 
b/native-engine/auron-memmgr/Cargo.toml
new file mode 100644
index 00000000..a5b6f4c1
--- /dev/null
+++ b/native-engine/auron-memmgr/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "auron-memmgr"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+auron-jni-bridge = { workspace = true }
+datafusion = { workspace = true }
+datafusion-ext-commons = { workspace = true }
+
+async-trait = { workspace = true }
+bytesize = { workspace = true }
+jni = { workspace = true }
+log = { workspace = true }
+once_cell = { workspace = true }
+tempfile = { workspace = true }
+parking_lot = { workspace = true }
+
+[target.'cfg(target_os = "linux")'.dependencies]
+procfs = { workspace = true }
diff --git a/native-engine/datafusion-ext-plans/src/memmgr/mod.rs 
b/native-engine/auron-memmgr/src/lib.rs
similarity index 99%
rename from native-engine/datafusion-ext-plans/src/memmgr/mod.rs
rename to native-engine/auron-memmgr/src/lib.rs
index cb28f09c..e9cb9031 100644
--- a/native-engine/datafusion-ext-plans/src/memmgr/mod.rs
+++ b/native-engine/auron-memmgr/src/lib.rs
@@ -13,6 +13,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#![feature(get_mut_unchecked)]
+
 pub mod metrics;
 pub mod spill;
 
diff --git a/native-engine/datafusion-ext-plans/src/memmgr/metrics.rs 
b/native-engine/auron-memmgr/src/metrics.rs
similarity index 100%
rename from native-engine/datafusion-ext-plans/src/memmgr/metrics.rs
rename to native-engine/auron-memmgr/src/metrics.rs
diff --git a/native-engine/datafusion-ext-plans/src/memmgr/spill.rs 
b/native-engine/auron-memmgr/src/spill.rs
similarity index 98%
rename from native-engine/datafusion-ext-plans/src/memmgr/spill.rs
rename to native-engine/auron-memmgr/src/spill.rs
index e3b8be07..3e7ef0b4 100644
--- a/native-engine/datafusion-ext-plans/src/memmgr/spill.rs
+++ b/native-engine/auron-memmgr/src/spill.rs
@@ -27,14 +27,12 @@ use auron_jni_bridge::{
     jni_get_string, jni_new_direct_byte_buffer, jni_new_global_ref,
 };
 use datafusion::{common::Result, parquet::file::reader::Length, 
physical_plan::metrics::Time};
+use datafusion_ext_commons::io::ipc_compression::{IoCompressionReader, 
IoCompressionWriter};
 use jni::{objects::GlobalRef, sys::jlong};
 use log::warn;
 use once_cell::sync::OnceCell;
 
-use crate::{
-    common::ipc_compression::{IoCompressionReader, IoCompressionWriter},
-    memmgr::metrics::SpillMetrics,
-};
+use crate::metrics::SpillMetrics;
 
 pub type SpillCompressedReader<'a> = IoCompressionReader<BufReader<Box<dyn 
Read + Send + 'a>>>;
 pub type SpillCompressedWriter<'a> = IoCompressionWriter<BufWriter<Box<dyn 
Write + Send + 'a>>>;
diff --git a/native-engine/auron-serde/Cargo.toml 
b/native-engine/auron-serde/Cargo.toml
index 12037e03..adb63dbf 100644
--- a/native-engine/auron-serde/Cargo.toml
+++ b/native-engine/auron-serde/Cargo.toml
@@ -26,17 +26,15 @@ default = ["prost/no-recursion-limit"]
 [dependencies]
 arrow = { workspace = true }
 datafusion = { workspace = true }
-datafusion-ext-commons = { workspace = true }
 datafusion-ext-exprs = { workspace = true }
 datafusion-ext-functions = { workspace = true }
 datafusion-ext-plans = { workspace = true }
 datafusion-spark = { workspace = true }
 
 base64 = { workspace = true }
-log = { workspace = true }
 object_store = { workspace = true }
-prost = { workspace = true }
 parking_lot = { workspace = true }
+prost = { workspace = true }
 
 [build-dependencies]
 tonic-build = { workspace = true }
diff --git a/native-engine/auron/Cargo.toml b/native-engine/auron/Cargo.toml
index 0afd6dbc..9e3f1353 100644
--- a/native-engine/auron/Cargo.toml
+++ b/native-engine/auron/Cargo.toml
@@ -33,12 +33,12 @@ http-service = []
 [dependencies]
 arrow = { workspace = true }
 auron-jni-bridge = { workspace = true }
+auron-memmgr = { workspace = true }
 auron-serde = { workspace = true }
 datafusion = { workspace = true }
 datafusion-ext-commons = { workspace = true }
 datafusion-ext-plans = { workspace = true }
 
-bytesize = { workspace = true }
 futures = { workspace = true }
 jni = { workspace = true }
 log = { workspace = true }
diff --git a/native-engine/auron/src/exec.rs b/native-engine/auron/src/exec.rs
index d8e4bdf8..5a6eb43c 100644
--- a/native-engine/auron/src/exec.rs
+++ b/native-engine/auron/src/exec.rs
@@ -18,6 +18,7 @@ use auron_jni_bridge::{
     jni_bridge::JavaClasses,
     *,
 };
+use auron_memmgr::MemManager;
 use datafusion::{
     common::Result,
     error::DataFusionError,
@@ -28,7 +29,6 @@ use datafusion::{
     },
     prelude::{SessionConfig, SessionContext},
 };
-use datafusion_ext_plans::memmgr::MemManager;
 use jni::{
     JNIEnv,
     objects::{JClass, JObject, JString},
diff --git a/native-engine/datafusion-ext-commons/Cargo.toml 
b/native-engine/datafusion-ext-commons/Cargo.toml
index 3dc0c176..7ca7fe6a 100644
--- a/native-engine/datafusion-ext-commons/Cargo.toml
+++ b/native-engine/datafusion-ext-commons/Cargo.toml
@@ -29,24 +29,21 @@ arrow = { workspace = true }
 arrow-schema = { workspace = true }
 auron-jni-bridge = { workspace = true }
 datafusion = { workspace = true }
-
-async-trait = { workspace = true }
 bigdecimal = { workspace = true }
 byteorder = { workspace = true }
-bytes = { workspace = true }
 chrono = { workspace = true }
-futures = { workspace = true }
 itertools = { workspace = true }
 jni = { workspace = true }
 log = { workspace = true }
+lz4_flex = { workspace = true }
 num = { workspace = true }
 once_cell = { workspace = true }
 paste = { workspace = true }
 smallvec = { workspace = true }
-tempfile = { workspace = true }
-transpose = { workspace = true }
 tokio = { workspace = true }
+transpose = { workspace = true }
 unchecked-index = { workspace = true }
+zstd = { workspace = true }
 
 [dev-dependencies]
 rand = { workspace = true }
diff --git a/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs 
b/native-engine/datafusion-ext-commons/src/io/ipc_compression.rs
similarity index 99%
rename from native-engine/datafusion-ext-plans/src/common/ipc_compression.rs
rename to native-engine/datafusion-ext-commons/src/io/ipc_compression.rs
index 6f0ebcff..d3ea0b29 100644
--- a/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs
+++ b/native-engine/datafusion-ext-commons/src/io/ipc_compression.rs
@@ -25,11 +25,12 @@ use auron_jni_bridge::{
 };
 use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
 use datafusion::common::Result;
-use datafusion_ext_commons::{
+use once_cell::sync::OnceCell;
+
+use crate::{
     df_execution_err,
     io::{read_one_batch, write_one_batch},
 };
-use once_cell::sync::OnceCell;
 
 pub struct IpcCompressionWriter<W: Write> {
     output: W,
diff --git a/native-engine/datafusion-ext-commons/src/io/mod.rs 
b/native-engine/datafusion-ext-commons/src/io/mod.rs
index 243ee57d..a8160174 100644
--- a/native-engine/datafusion-ext-commons/src/io/mod.rs
+++ b/native-engine/datafusion-ext-commons/src/io/mod.rs
@@ -27,6 +27,7 @@ pub use scalar_serde::{read_scalar, write_scalar};
 use crate::{UninitializedInit, arrow::cast::cast};
 
 mod batch_serde;
+pub mod ipc_compression;
 mod scalar_serde;
 
 pub fn write_one_batch(num_rows: usize, cols: &[ArrayRef], mut output: impl 
Write) -> Result<()> {
diff --git a/native-engine/datafusion-ext-exprs/Cargo.toml 
b/native-engine/datafusion-ext-exprs/Cargo.toml
index f9d60fc2..4387bbdf 100644
--- a/native-engine/datafusion-ext-exprs/Cargo.toml
+++ b/native-engine/datafusion-ext-exprs/Cargo.toml
@@ -23,7 +23,6 @@ resolver = "1"
 
 [dependencies]
 arrow = { workspace = true }
-async-trait = "0.1.89"
 auron-jni-bridge = { workspace = true }
 datafusion = { workspace = true }
 datafusion-ext-commons = { workspace = true }
@@ -31,6 +30,5 @@ datafusion-ext-commons = { workspace = true }
 itertools = { workspace = true }
 jni = { workspace = true }
 log = { workspace = true }
-num = { workspace = true }
 once_cell = { workspace = true }
 parking_lot = { workspace = true }
diff --git a/native-engine/datafusion-ext-functions/Cargo.toml 
b/native-engine/datafusion-ext-functions/Cargo.toml
index 495e4c7a..98de05f8 100644
--- a/native-engine/datafusion-ext-functions/Cargo.toml
+++ b/native-engine/datafusion-ext-functions/Cargo.toml
@@ -23,7 +23,6 @@ resolver = "1"
 
 [dependencies]
 arrow = { workspace = true }
-async-trait = "0.1.89"
 auron-jni-bridge = { workspace = true }
 datafusion = { workspace = true }
 datafusion-ext-commons = { workspace = true }
@@ -33,4 +32,4 @@ log = { workspace = true }
 num = { workspace = true }
 paste = { workspace = true }
 serde_json = { workspace = true }
-sonic-rs = { workspace = true }
\ No newline at end of file
+sonic-rs = { workspace = true }
diff --git a/native-engine/datafusion-ext-plans/Cargo.toml 
b/native-engine/datafusion-ext-plans/Cargo.toml
index d452ffe0..e6dff419 100644
--- a/native-engine/datafusion-ext-plans/Cargo.toml
+++ b/native-engine/datafusion-ext-plans/Cargo.toml
@@ -28,6 +28,7 @@ default = ["tokio/rt-multi-thread"]
 arrow = { workspace = true }
 arrow-schema = { workspace = true }
 auron-jni-bridge = { workspace = true }
+auron-memmgr = { workspace = true }
 datafusion = { workspace = true }
 datafusion-datasource = { workspace = true }
 datafusion-datasource-parquet = { workspace = true }
@@ -50,7 +51,6 @@ hashbrown = { workspace = true }
 itertools = { workspace = true }
 jni = { workspace = true }
 log = { workspace = true }
-lz4_flex = { workspace = true }
 num = { workspace = true }
 object_store = { workspace = true }
 once_cell = { workspace = true }
@@ -58,10 +58,8 @@ panic-message = { workspace = true }
 parking_lot = { workspace = true }
 paste = { workspace = true }
 smallvec = { workspace = true }
-tempfile = { workspace = true }
 tokio = { workspace = true }
 unchecked-index = { workspace = true }
-zstd = { workspace = true }
 
 [target.'cfg(target_os = "linux")'.dependencies]
 procfs = { workspace = true }
diff --git a/native-engine/datafusion-ext-plans/src/agg/acc.rs 
b/native-engine/datafusion-ext-plans/src/agg/acc.rs
index 1292c0c8..d3ce8049 100644
--- a/native-engine/datafusion-ext-plans/src/agg/acc.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/acc.rs
@@ -23,6 +23,7 @@ use arrow::{
     array::*,
     datatypes::{DataType, *},
 };
+use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter};
 use bitvec::{bitvec, vec::BitVec};
 use byteorder::{ReadBytesExt, WriteBytesExt};
 use datafusion::common::{Result, ScalarValue, utils::proxy::VecAllocExt};
@@ -33,11 +34,7 @@ use datafusion_ext_commons::{
 };
 use smallvec::SmallVec;
 
-use crate::{
-    agg::agg::IdxSelection,
-    idx_for, idx_with_iter,
-    memmgr::spill::{SpillCompressedReader, SpillCompressedWriter},
-};
+use crate::{agg::agg::IdxSelection, idx_for, idx_with_iter};
 
 pub trait AccColumn: Send {
     fn as_any(&self) -> &dyn Any;
diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs 
b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs
index d2adb51b..2e433391 100644
--- a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs
@@ -21,6 +21,10 @@ use std::{
 
 use arrow::{record_batch::RecordBatch, row::Rows};
 use async_trait::async_trait;
+use auron_memmgr::{
+    MemConsumer, MemConsumerInfo, MemManager,
+    spill::{Spill, SpillCompressedReader, SpillCompressedWriter, 
try_new_spill},
+};
 use bytesize::ByteSize;
 use datafusion::{
     common::{DataFusionError, Result},
@@ -52,10 +56,6 @@ use crate::{
         execution_context::{ExecutionContext, WrappedRecordBatchSender},
         timer_helper::TimerHelper,
     },
-    memmgr::{
-        MemConsumer, MemConsumerInfo, MemManager,
-        spill::{Spill, SpillCompressedReader, SpillCompressedWriter, 
try_new_spill},
-    },
 };
 
 pub type OwnedKey = SmallVec<u8, 24>;
diff --git a/native-engine/datafusion-ext-plans/src/agg/avg.rs 
b/native-engine/datafusion-ext-plans/src/agg/avg.rs
index 7512b774..008dd34b 100644
--- a/native-engine/datafusion-ext-plans/src/agg/avg.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/avg.rs
@@ -21,6 +21,7 @@ use std::{
 };
 
 use arrow::{array::*, datatypes::*};
+use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter};
 use datafusion::{
     common::{
         Result,
@@ -30,15 +31,12 @@ use datafusion::{
 };
 use datafusion_ext_commons::downcast_any;
 
-use crate::{
-    agg::{
-        Agg,
-        acc::{AccColumn, AccColumnRef},
-        agg::IdxSelection,
-        count::AggCount,
-        sum::AggSum,
-    },
-    memmgr::spill::{SpillCompressedReader, SpillCompressedWriter},
+use crate::agg::{
+    Agg,
+    acc::{AccColumn, AccColumnRef},
+    agg::IdxSelection,
+    count::AggCount,
+    sum::AggSum,
 };
 
 pub struct AggAvg {
diff --git a/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs 
b/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs
index c7b4dd9f..7c6f1da3 100644
--- a/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs
@@ -24,6 +24,7 @@ use arrow::{
     array::{ArrayRef, AsArray, BinaryBuilder},
     datatypes::{DataType, Int64Type},
 };
+use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter};
 use byteorder::{ReadBytesExt, WriteBytesExt};
 use datafusion::{common::Result, physical_expr::PhysicalExprRef};
 use datafusion_ext_commons::{
@@ -37,7 +38,6 @@ use crate::{
         agg::IdxSelection,
     },
     idx_for, idx_for_zipped,
-    memmgr::spill::{SpillCompressedReader, SpillCompressedWriter},
 };
 
 pub struct AggBloomFilter {
diff --git a/native-engine/datafusion-ext-plans/src/agg/collect.rs 
b/native-engine/datafusion-ext-plans/src/agg/collect.rs
index 0472eede..6ddeed83 100644
--- a/native-engine/datafusion-ext-plans/src/agg/collect.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/collect.rs
@@ -23,6 +23,7 @@ use std::{
 };
 
 use arrow::{array::*, datatypes::*};
+use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter};
 use datafusion::{
     common::{Result, ScalarValue},
     physical_expr::PhysicalExprRef,
@@ -41,7 +42,6 @@ use crate::{
         agg::{Agg, IdxSelection},
     },
     idx_for, idx_for_zipped,
-    memmgr::spill::{SpillCompressedReader, SpillCompressedWriter},
 };
 
 pub type AggCollectSet = AggGenericCollect<AccSetColumn>;
@@ -650,10 +650,10 @@ fn acc_hash(value: impl AsRef<[u8]>) -> u64 {
 #[cfg(test)]
 mod tests {
     use arrow::datatypes::DataType;
+    use auron_memmgr::spill::Spill;
     use datafusion::common::ScalarValue;
 
     use super::*;
-    use crate::memmgr::spill::Spill;
 
     #[test]
     fn test_acc_set_append() {
diff --git a/native-engine/datafusion-ext-plans/src/agg/count.rs 
b/native-engine/datafusion-ext-plans/src/agg/count.rs
index 253bcf58..ab17c5d3 100644
--- a/native-engine/datafusion-ext-plans/src/agg/count.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/count.rs
@@ -21,6 +21,7 @@ use std::{
 };
 
 use arrow::{array::*, datatypes::*};
+use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter};
 use datafusion::{common::Result, physical_expr::PhysicalExprRef};
 use datafusion_ext_commons::{
     downcast_any,
@@ -33,7 +34,6 @@ use crate::{
         agg::{Agg, IdxSelection},
     },
     idx_for, idx_for_zipped, idx_with_iter,
-    memmgr::spill::{SpillCompressedReader, SpillCompressedWriter},
 };
 
 pub struct AggCount {
diff --git a/native-engine/datafusion-ext-plans/src/agg/first.rs 
b/native-engine/datafusion-ext-plans/src/agg/first.rs
index 494631c9..b4bbc115 100644
--- a/native-engine/datafusion-ext-plans/src/agg/first.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/first.rs
@@ -21,6 +21,7 @@ use std::{
 };
 
 use arrow::{array::*, datatypes::*};
+use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter};
 use datafusion::{
     common::{Result, ScalarValue},
     physical_expr::PhysicalExprRef,
@@ -37,7 +38,6 @@ use crate::{
         agg::IdxSelection,
     },
     idx_for_zipped,
-    memmgr::spill::{SpillCompressedReader, SpillCompressedWriter},
 };
 
 pub struct AggFirst {
diff --git a/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs 
b/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs
index 2eda7766..deba5d34 100644
--- a/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs
@@ -30,6 +30,7 @@ use auron_jni_bridge::{
     jni_bridge::LocalRef, jni_call, jni_get_byte_array_len, 
jni_get_byte_array_region,
     jni_new_direct_byte_buffer, jni_new_global_ref, jni_new_object, 
jni_new_prim_array,
 };
+use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter};
 use datafusion::{
     common::{DataFusionError, Result},
     physical_expr::PhysicalExprRef,
@@ -47,7 +48,6 @@ use crate::{
         agg::{Agg, IdxSelection},
     },
     idx_for_zipped,
-    memmgr::spill::{SpillCompressedReader, SpillCompressedWriter},
 };
 
 pub struct SparkUDAFWrapper {
diff --git a/native-engine/datafusion-ext-plans/src/agg_exec.rs 
b/native-engine/datafusion-ext-plans/src/agg_exec.rs
index 80c828dc..a77babd0 100644
--- a/native-engine/datafusion-ext-plans/src/agg_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/agg_exec.rs
@@ -24,6 +24,7 @@ use arrow::{
     datatypes::SchemaRef,
 };
 use auron_jni_bridge::conf::{IntConf, 
UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG};
+use auron_memmgr::MemManager;
 use datafusion::{
     common::{Result, Statistics},
     error::DataFusionError,
@@ -50,7 +51,6 @@ use crate::{
     },
     common::{execution_context::ExecutionContext, timer_helper::TimerHelper},
     expand_exec::ExpandExec,
-    memmgr::MemManager,
     project_exec::ProjectExec,
     sort_exec::create_default_ascending_sort_exec,
 };
@@ -418,6 +418,7 @@ mod test {
         datatypes::{DataType, Field, Schema},
         record_batch::RecordBatch,
     };
+    use auron_memmgr::MemManager;
     use datafusion::{
         assert_batches_sorted_eq,
         common::{Result, ScalarValue},
@@ -435,7 +436,6 @@ mod test {
             agg::create_agg,
         },
         agg_exec::AggExec,
-        memmgr::MemManager,
     };
 
     fn build_table_i32(
@@ -691,6 +691,7 @@ mod fuzztest {
         datatypes::{DataType, Float64Type, Int64Type},
         record_batch::RecordBatch,
     };
+    use auron_memmgr::MemManager;
     use datafusion::{
         common::Result,
         physical_expr::expressions as phys_expr,
@@ -708,7 +709,6 @@ mod fuzztest {
             sum::AggSum,
         },
         agg_exec::AggExec,
-        memmgr::MemManager,
     };
 
     #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
diff --git a/native-engine/datafusion-ext-plans/src/common/execution_context.rs 
b/native-engine/datafusion-ext-plans/src/common/execution_context.rs
index b79b1381..15b6e69d 100644
--- a/native-engine/datafusion-ext-plans/src/common/execution_context.rs
+++ b/native-engine/datafusion-ext-plans/src/common/execution_context.rs
@@ -30,6 +30,7 @@ use arrow::{
 };
 use arrow_schema::Schema;
 use auron_jni_bridge::{conf, conf::BooleanConf, is_task_running};
+use auron_memmgr::metrics::SpillMetrics;
 use datafusion::{
     common::{DataFusionError, Result},
     execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext},
@@ -62,7 +63,6 @@ use crate::{
         },
         timer_helper::TimerHelper,
     },
-    memmgr::metrics::SpillMetrics,
     sort_exec::SortExec,
 };
 
diff --git a/native-engine/datafusion-ext-plans/src/common/mod.rs 
b/native-engine/datafusion-ext-plans/src/common/mod.rs
index 2bbe09fc..def20537 100644
--- a/native-engine/datafusion-ext-plans/src/common/mod.rs
+++ b/native-engine/datafusion-ext-plans/src/common/mod.rs
@@ -16,7 +16,6 @@
 pub mod cached_exprs_evaluator;
 pub mod column_pruning;
 pub mod execution_context;
-pub mod ipc_compression;
 pub mod key_rows_output;
 pub mod offsetted;
 pub mod row_null_checker;
diff --git a/native-engine/datafusion-ext-plans/src/expand_exec.rs 
b/native-engine/datafusion-ext-plans/src/expand_exec.rs
index 8521eddc..9030e3cd 100644
--- a/native-engine/datafusion-ext-plans/src/expand_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/expand_exec.rs
@@ -195,6 +195,7 @@ mod test {
         datatypes::{DataType, Field, Schema},
         record_batch::RecordBatch,
     };
+    use auron_memmgr::MemManager;
     use datafusion::{
         assert_batches_eq,
         common::{Result, ScalarValue},
@@ -204,7 +205,7 @@ mod test {
         prelude::SessionContext,
     };
 
-    use crate::{expand_exec::ExpandExec, memmgr::MemManager};
+    use crate::expand_exec::ExpandExec;
 
     // build i32 table
     fn build_table_i32(a: (&str, &Vec<i32>)) -> RecordBatch {
diff --git a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs 
b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
index a1e391d8..255e0e1f 100644
--- a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
@@ -51,13 +51,15 @@ use datafusion_ext_commons::{
         array_size::{ArraySize, BatchSize},
         coalesce::coalesce_arrays_unchecked,
     },
-    batch_size, df_execution_err, suggested_batch_mem_size,
+    batch_size, df_execution_err,
+    io::ipc_compression::IpcCompressionReader,
+    suggested_batch_mem_size,
 };
 use jni::objects::{GlobalRef, JObject};
 use once_cell::sync::OnceCell;
 use parking_lot::Mutex;
 
-use crate::common::{execution_context::ExecutionContext, 
ipc_compression::IpcCompressionReader};
+use crate::common::execution_context::ExecutionContext;
 
 #[derive(Debug, Clone)]
 pub struct IpcReaderExec {
diff --git a/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs 
b/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs
index a28dfc77..7c840fd7 100644
--- a/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs
@@ -32,14 +32,12 @@ use datafusion::{
         stream::RecordBatchStreamAdapter,
     },
 };
+use datafusion_ext_commons::io::ipc_compression::IpcCompressionWriter;
 use futures::{StreamExt, TryStreamExt, stream::once};
 use jni::objects::{GlobalRef, JObject};
 use once_cell::sync::OnceCell;
 
-use crate::common::{
-    execution_context::ExecutionContext, ipc_compression::IpcCompressionWriter,
-    timer_helper::TimerHelper,
-};
+use crate::common::{execution_context::ExecutionContext, 
timer_helper::TimerHelper};
 
 #[derive(Debug)]
 pub struct IpcWriterExec {
diff --git a/native-engine/datafusion-ext-plans/src/joins/test.rs 
b/native-engine/datafusion-ext-plans/src/joins/test.rs
index 06d2ad59..9125ed53 100644
--- a/native-engine/datafusion-ext-plans/src/joins/test.rs
+++ b/native-engine/datafusion-ext-plans/src/joins/test.rs
@@ -25,6 +25,7 @@ mod tests {
         datatypes::{DataType, Field, Schema, SchemaRef},
         record_batch::RecordBatch,
     };
+    use auron_memmgr::MemManager;
     use datafusion::{
         assert_batches_sorted_eq,
         common::JoinSide,
@@ -38,7 +39,6 @@ mod tests {
         broadcast_join_build_hash_map_exec::BroadcastJoinBuildHashMapExec,
         broadcast_join_exec::BroadcastJoinExec,
         joins::join_utils::{JoinType, JoinType::*},
-        memmgr::MemManager,
         sort_merge_join_exec::SortMergeJoinExec,
     };
 
diff --git a/native-engine/datafusion-ext-plans/src/lib.rs 
b/native-engine/datafusion-ext-plans/src/lib.rs
index 6d510ce2..f85f3b26 100644
--- a/native-engine/datafusion-ext-plans/src/lib.rs
+++ b/native-engine/datafusion-ext-plans/src/lib.rs
@@ -48,9 +48,6 @@ pub mod sort_merge_join_exec;
 pub mod union_exec;
 pub mod window_exec;
 
-// memory management
-pub mod memmgr;
-
 // helper modules
 pub mod common;
 pub mod generate;
diff --git a/native-engine/datafusion-ext-plans/src/limit_exec.rs 
b/native-engine/datafusion-ext-plans/src/limit_exec.rs
index 81110456..1b9f5892 100644
--- a/native-engine/datafusion-ext-plans/src/limit_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/limit_exec.rs
@@ -153,6 +153,7 @@ mod test {
         datatypes::{DataType, Field, Schema},
         record_batch::RecordBatch,
     };
+    use auron_memmgr::MemManager;
     use datafusion::{
         assert_batches_eq,
         common::{Result, stats::Precision},
@@ -160,7 +161,7 @@ mod test {
         prelude::SessionContext,
     };
 
-    use crate::{limit_exec::LimitExec, memmgr::MemManager};
+    use crate::limit_exec::LimitExec;
 
     fn build_table_i32(
         a: (&str, &Vec<i32>),
diff --git a/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs 
b/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs
index 22c9687f..bad86413 100644
--- a/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs
@@ -19,6 +19,7 @@ use std::{any::Any, fmt::Debug, sync::Arc};
 
 use async_trait::async_trait;
 use auron_jni_bridge::{jni_call_static, jni_new_global_ref, jni_new_string};
+use auron_memmgr::MemManager;
 use datafusion::{
     arrow::datatypes::SchemaRef,
     error::{DataFusionError, Result},
@@ -36,7 +37,6 @@ use once_cell::sync::OnceCell;
 
 use crate::{
     common::execution_context::ExecutionContext,
-    memmgr::MemManager,
     shuffle::{
         Partitioning, ShuffleRepartitioner,
         rss_single_repartitioner::RssSingleShuffleRepartitioner,
diff --git a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs 
b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs
index d6df4f19..6b88ba8f 100644
--- a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs
+++ b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs
@@ -27,6 +27,7 @@ use datafusion_ext_commons::{
         selection::{BatchInterleaver, create_batch_interleaver},
     },
     compute_suggested_batch_size_for_output, df_execution_err,
+    io::ipc_compression::IpcCompressionWriter,
 };
 use itertools::Itertools;
 use jni::objects::GlobalRef;
@@ -35,7 +36,6 @@ use parking_lot::Mutex;
 
 use crate::{
     common::{
-        ipc_compression::IpcCompressionWriter,
         offsetted::{Offsetted, OffsettedMergeIterator},
         timer_helper::TimerHelper,
     },
diff --git 
a/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs 
b/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs
index ca7ee73d..a41f5516 100644
--- a/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs
+++ b/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs
@@ -17,14 +17,11 @@ use std::sync::Arc;
 
 use async_trait::async_trait;
 use datafusion::{arrow::record_batch::RecordBatch, common::Result, 
physical_plan::metrics::Time};
-use datafusion_ext_commons::df_execution_err;
+use datafusion_ext_commons::{df_execution_err, 
io::ipc_compression::IpcCompressionWriter};
 use jni::objects::GlobalRef;
 use parking_lot::Mutex;
 
-use crate::{
-    common::ipc_compression::IpcCompressionWriter,
-    shuffle::{ShuffleRepartitioner, rss::RssWriter},
-};
+use crate::shuffle::{ShuffleRepartitioner, rss::RssWriter};
 
 pub struct RssSingleShuffleRepartitioner {
     rss_partition_writer: Arc<Mutex<IpcCompressionWriter<RssWriter>>>,
diff --git 
a/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs 
b/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs
index 58d3ed0d..b0b2c825 100644
--- a/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs
+++ b/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs
@@ -17,15 +17,13 @@ use std::sync::Weak;
 
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
+use auron_memmgr::{MemConsumer, MemConsumerInfo, MemManager};
 use datafusion::{common::Result, physical_plan::metrics::Time};
 use datafusion_ext_commons::arrow::array_size::BatchSize;
 use futures::lock::Mutex;
 use jni::objects::GlobalRef;
 
-use crate::{
-    memmgr::{MemConsumer, MemConsumerInfo, MemManager},
-    shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData},
-};
+use crate::shuffle::{Partitioning, ShuffleRepartitioner, 
buffered_data::BufferedData};
 
 pub struct RssSortShuffleRepartitioner {
     mem_consumer_info: Option<Weak<MemConsumerInfo>>,
diff --git 
a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs 
b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs
index bfe87512..d5f4f31d 100644
--- a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs
+++ b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs
@@ -22,13 +22,11 @@ use std::{
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
 use datafusion::{common::Result, physical_plan::metrics::Time};
+use datafusion_ext_commons::io::ipc_compression::IpcCompressionWriter;
 use tokio::sync::Mutex;
 
 use crate::{
-    common::{
-        ipc_compression::IpcCompressionWriter,
-        timer_helper::{TimedWriter, TimerHelper},
-    },
+    common::timer_helper::{TimedWriter, TimerHelper},
     shuffle::{ShuffleRepartitioner, open_shuffle_file},
 };
 
diff --git 
a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs 
b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs
index 169ad212..b76ed631 100644
--- a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs
+++ b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs
@@ -20,6 +20,10 @@ use std::{
 
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
+use auron_memmgr::{
+    MemConsumer, MemConsumerInfo, MemManager,
+    spill::{OwnedSpillBufReader, Spill, try_new_spill},
+};
 use bytesize::ByteSize;
 use datafusion::{
     common::{DataFusionError, Result},
@@ -34,10 +38,6 @@ use crate::{
         offsetted::{Offsetted, OffsettedMergeIterator},
         timer_helper::TimerHelper,
     },
-    memmgr::{
-        MemConsumer, MemConsumerInfo, MemManager,
-        spill::{OwnedSpillBufReader, Spill, try_new_spill},
-    },
     shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData, 
open_shuffle_file},
 };
 
diff --git a/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs 
b/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs
index afa16fd4..32c2dc14 100644
--- a/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs
@@ -19,6 +19,7 @@ use std::{any::Any, fmt::Debug, sync::Arc};
 
 use arrow::datatypes::SchemaRef;
 use async_trait::async_trait;
+use auron_memmgr::MemManager;
 use datafusion::{
     error::Result,
     execution::context::TaskContext,
@@ -36,7 +37,6 @@ use once_cell::sync::OnceCell;
 
 use crate::{
     common::execution_context::ExecutionContext,
-    memmgr::MemManager,
     shuffle::{
         Partitioning, ShuffleRepartitioner, 
single_repartitioner::SingleShuffleRepartitioner,
         sort_repartitioner::SortShuffleRepartitioner,
diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs 
b/native-engine/datafusion-ext-plans/src/sort_exec.rs
index 2a201cdf..24d0beb6 100644
--- a/native-engine/datafusion-ext-plans/src/sort_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs
@@ -33,6 +33,10 @@ use arrow::{
     row::{RowConverter, Rows, SortField},
 };
 use async_trait::async_trait;
+use auron_memmgr::{
+    MemConsumer, MemConsumerInfo, MemManager,
+    spill::{Spill, SpillCompressedReader, SpillCompressedWriter, 
try_new_spill},
+};
 use bytesize::ByteSize;
 use datafusion::{
     common::{DataFusionError, Result, Statistics, utils::proxy::VecAllocExt},
@@ -62,19 +66,13 @@ use itertools::Itertools;
 use once_cell::sync::OnceCell;
 use parking_lot::Mutex;
 
-use crate::{
-    common::{
-        execution_context::{
-            ExecutionContext, WrappedRecordBatchSender, 
WrappedRecordBatchWithKeyRowsSender,
-            WrappedSenderTrait,
-        },
-        key_rows_output::{RecordBatchWithKeyRows, 
SendableRecordBatchWithKeyRowsStream},
-        timer_helper::TimerHelper,
-    },
-    memmgr::{
-        MemConsumer, MemConsumerInfo, MemManager,
-        spill::{Spill, SpillCompressedReader, SpillCompressedWriter, 
try_new_spill},
+use crate::common::{
+    execution_context::{
+        ExecutionContext, WrappedRecordBatchSender, 
WrappedRecordBatchWithKeyRowsSender,
+        WrappedSenderTrait,
     },
+    key_rows_output::{RecordBatchWithKeyRows, 
SendableRecordBatchWithKeyRowsStream},
+    timer_helper::TimerHelper,
 };
 
 // reserve memory for each spill
@@ -1412,6 +1410,7 @@ mod test {
         datatypes::{DataType, Field, Schema},
         record_batch::RecordBatch,
     };
+    use auron_memmgr::MemManager;
     use datafusion::{
         assert_batches_eq,
         common::Result,
@@ -1420,7 +1419,7 @@ mod test {
         prelude::SessionContext,
     };
 
-    use crate::{memmgr::MemManager, sort_exec::SortExec};
+    use crate::sort_exec::SortExec;
 
     fn build_table_i32(
         a: (&str, &Vec<i32>),
@@ -1499,6 +1498,7 @@ mod fuzztest {
         compute::{SortOptions, concat_batches},
         record_batch::RecordBatch,
     };
+    use auron_memmgr::MemManager;
     use datafusion::{
         common::{Result, stats::Precision},
         physical_expr::{LexOrdering, PhysicalSortExpr, expressions::Column},
@@ -1506,7 +1506,7 @@ mod fuzztest {
         prelude::{SessionConfig, SessionContext},
     };
 
-    use crate::{memmgr::MemManager, sort_exec::SortExec};
+    use crate::sort_exec::SortExec;
 
     #[tokio::test]
     async fn fuzztest_in_mem_sorting() -> Result<()> {


Reply via email to