This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 34d50a7a7 chore: extract shuffle module into separate crate (#3749)
34d50a7a7 is described below
commit 34d50a7a77a65e554a8f86756b2c11f53e08cccf
Author: Andy Grove <[email protected]>
AuthorDate: Mon Mar 23 06:45:46 2026 -0600
chore: extract shuffle module into separate crate (#3749)
---
native/Cargo.lock | 32 +++++-
native/Cargo.toml | 5 +-
native/common/README.md | 25 ++++
native/common/src/lib.rs | 3 +
.../{core/src/execution => common/src}/tracing.rs | 21 ++--
.../src/execution/mod.rs => common/src/utils.rs} | 39 +++----
native/core/Cargo.toml | 16 +--
native/core/src/execution/mod.rs | 2 +-
native/core/src/execution/tracing.rs | 126 +--------------------
native/core/src/execution/utils.rs | 21 +---
native/jni-bridge/README.md | 25 ++++
native/shuffle/Cargo.toml | 66 +++++++++++
native/shuffle/README.md | 25 ++++
native/{core => shuffle}/benches/row_columnar.rs | 6 +-
native/{core => shuffle}/benches/shuffle_writer.rs | 6 +-
.../src/execution/shuffle => shuffle/src}/codec.rs | 2 +-
.../shuffle => shuffle/src}/comet_partitioning.rs | 2 +-
.../shuffle/mod.rs => shuffle/src/lib.rs} | 10 +-
.../execution/shuffle => shuffle/src}/metrics.rs | 20 ++--
.../shuffle => shuffle/src}/partitioners/mod.rs | 8 +-
.../src}/partitioners/multi_partition.rs | 14 +--
.../partitioners/partitioned_batch_iterator.rs | 0
.../src}/partitioners/single_partition.rs | 8 +-
.../shuffle => shuffle/src}/shuffle_writer.rs | 12 +-
.../shuffle => shuffle/src}/spark_unsafe/list.rs | 14 +--
.../shuffle => shuffle/src}/spark_unsafe/map.rs | 6 +-
.../shuffle => shuffle/src}/spark_unsafe/mod.rs | 0
.../shuffle => shuffle/src}/spark_unsafe/row.rs | 20 ++--
.../src}/writers/buf_batch_writer.rs | 2 +-
.../shuffle => shuffle/src}/writers/mod.rs | 4 +-
.../src}/writers/partition_writer.rs | 8 +-
31 files changed, 274 insertions(+), 274 deletions(-)
diff --git a/native/Cargo.lock b/native/Cargo.lock
index 340f0fe0c..bc015fc38 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -1834,12 +1834,13 @@ dependencies = [
"aws-config",
"aws-credential-types",
"bytes",
- "crc32fast",
"criterion",
"datafusion",
+ "datafusion-comet-common",
"datafusion-comet-jni-bridge",
"datafusion-comet-objectstore-hdfs",
"datafusion-comet-proto",
+ "datafusion-comet-shuffle",
"datafusion-comet-spark-expr",
"datafusion-datasource",
"datafusion-functions-nested",
@@ -1856,7 +1857,6 @@ dependencies = [
"lazy_static",
"log",
"log4rs",
- "lz4_flex 0.13.0",
"mimalloc",
"num",
"object_store",
@@ -1872,15 +1872,12 @@ dependencies = [
"rand 0.10.0",
"reqwest",
"serde_json",
- "simd-adler32",
- "snap",
"tempfile",
"tikv-jemalloc-ctl",
"tikv-jemallocator",
"tokio",
"url",
"uuid",
- "zstd",
]
[[package]]
@@ -1949,6 +1946,31 @@ dependencies = [
"prost-build",
]
+[[package]]
+name = "datafusion-comet-shuffle"
+version = "0.15.0"
+dependencies = [
+ "arrow",
+ "async-trait",
+ "bytes",
+ "crc32fast",
+ "criterion",
+ "datafusion",
+ "datafusion-comet-common",
+ "datafusion-comet-jni-bridge",
+ "datafusion-comet-spark-expr",
+ "futures",
+ "itertools 0.14.0",
+ "jni",
+ "log",
+ "lz4_flex 0.13.0",
+ "simd-adler32",
+ "snap",
+ "tempfile",
+ "tokio",
+ "zstd",
+]
+
[[package]]
name = "datafusion-comet-spark-expr"
version = "0.15.0"
diff --git a/native/Cargo.toml b/native/Cargo.toml
index 693221b15..e75c1fd24 100644
--- a/native/Cargo.toml
+++ b/native/Cargo.toml
@@ -16,8 +16,8 @@
# under the License.
[workspace]
-default-members = ["core", "spark-expr", "common", "proto", "jni-bridge"]
-members = ["core", "spark-expr", "common", "proto", "jni-bridge", "hdfs",
"fs-hdfs"]
+default-members = ["core", "spark-expr", "common", "proto", "jni-bridge",
"shuffle"]
+members = ["core", "spark-expr", "common", "proto", "jni-bridge", "shuffle",
"hdfs", "fs-hdfs"]
resolver = "2"
[workspace.package]
@@ -46,6 +46,7 @@ datafusion-comet-spark-expr = { path = "spark-expr" }
datafusion-comet-common = { path = "common" }
datafusion-comet-jni-bridge = { path = "jni-bridge" }
datafusion-comet-proto = { path = "proto" }
+datafusion-comet-shuffle = { path = "shuffle" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.10" }
futures = "0.3.32"
diff --git a/native/common/README.md b/native/common/README.md
new file mode 100644
index 000000000..842b441b5
--- /dev/null
+++ b/native/common/README.md
@@ -0,0 +1,25 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# datafusion-comet-common: Common Types
+
+This crate provides common types shared across Apache DataFusion Comet crates
and is maintained as part of the
+[Apache DataFusion Comet] subproject.
+
+[Apache DataFusion Comet]: https://github.com/apache/datafusion-comet/
diff --git a/native/common/src/lib.rs b/native/common/src/lib.rs
index 9319d7347..a9549badb 100644
--- a/native/common/src/lib.rs
+++ b/native/common/src/lib.rs
@@ -17,6 +17,9 @@
mod error;
mod query_context;
+pub mod tracing;
+mod utils;
pub use error::{decimal_overflow_error, SparkError, SparkErrorWithContext,
SparkResult};
pub use query_context::{create_query_context_map, QueryContext,
QueryContextMap};
+pub use utils::bytes_to_i128;
diff --git a/native/core/src/execution/tracing.rs b/native/common/src/tracing.rs
similarity index 88%
copy from native/core/src/execution/tracing.rs
copy to native/common/src/tracing.rs
index 01351565f..58bea64a7 100644
--- a/native/core/src/execution/tracing.rs
+++ b/native/common/src/tracing.rs
@@ -16,12 +16,11 @@
// under the License.
use datafusion::common::instant::Instant;
-use once_cell::sync::Lazy;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, LazyLock, Mutex};
-pub(crate) static RECORDER: Lazy<Recorder> = Lazy::new(Recorder::new);
+pub static RECORDER: LazyLock<Recorder> = LazyLock::new(Recorder::new);
/// Log events using Chrome trace format JSON
/// https://github.com/catapult-project/catapult/blob/main/tracing/README.md
@@ -30,6 +29,12 @@ pub struct Recorder {
writer: Arc<Mutex<BufWriter<File>>>,
}
+impl Default for Recorder {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
impl Recorder {
pub fn new() -> Self {
let file = OpenOptions::new()
@@ -94,19 +99,19 @@ impl Recorder {
}
}
-pub(crate) fn trace_begin(name: &str) {
+pub fn trace_begin(name: &str) {
RECORDER.begin_task(name);
}
-pub(crate) fn trace_end(name: &str) {
+pub fn trace_end(name: &str) {
RECORDER.end_task(name);
}
-pub(crate) fn log_memory_usage(name: &str, value: u64) {
+pub fn log_memory_usage(name: &str, value: u64) {
RECORDER.log_memory_usage(name, value);
}
-pub(crate) fn with_trace<T, F>(label: &str, tracing_enabled: bool, f: F) -> T
+pub fn with_trace<T, F>(label: &str, tracing_enabled: bool, f: F) -> T
where
F: FnOnce() -> T,
{
@@ -123,7 +128,7 @@ where
result
}
-pub(crate) async fn with_trace_async<F, Fut, T>(label: &str, tracing_enabled:
bool, f: F) -> T
+pub async fn with_trace_async<F, Fut, T>(label: &str, tracing_enabled: bool,
f: F) -> T
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = T>,
diff --git a/native/core/src/execution/mod.rs b/native/common/src/utils.rs
similarity index 59%
copy from native/core/src/execution/mod.rs
copy to native/common/src/utils.rs
index 85fc67246..12283db30 100644
--- a/native/core/src/execution/mod.rs
+++ b/native/common/src/utils.rs
@@ -15,28 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-//! PoC of vectorization execution through JNI to Rust.
-pub mod columnar_to_row;
-pub mod expressions;
-pub mod jni_api;
-pub(crate) mod metrics;
-pub mod operators;
-pub(crate) mod planner;
-pub mod serde;
-pub mod shuffle;
-pub(crate) mod sort;
-pub(crate) mod spark_plan;
-pub use datafusion_comet_spark_expr::timezone;
-mod memory_pools;
-pub(crate) mod spark_config;
-pub(crate) mod tracing;
-pub(crate) mod utils;
+/// Converts a slice of bytes to i128. The bytes are serialized in big-endian
order by
+/// `BigInteger.toByteArray()` in Java.
+pub fn bytes_to_i128(slice: &[u8]) -> i128 {
+ let mut bytes = [0; 16];
+ let mut i = 0;
+ while i != 16 && i != slice.len() {
+ bytes[i] = slice[slice.len() - 1 - i];
+ i += 1;
+ }
-#[cfg(test)]
-mod tests {
- #[test]
- fn it_works() {
- let result = 2 + 2;
- assert_eq!(result, 4);
+ // if the decimal is negative, we need to flip all the bits
+ if (slice[0] as i8) < 0 {
+ while i < 16 {
+ bytes[i] = !bytes[i];
+ i += 1;
+ }
}
+
+ i128::from_le_bytes(bytes)
}
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index 3f305a631..b66830ecb 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -47,10 +47,6 @@ log = "0.4"
log4rs = "1.4.0"
prost = "0.14.3"
jni = "0.21"
-snap = "1.1"
-# we disable default features in lz4_flex to force the use of the faster
unsafe encoding and decoding implementation
-lz4_flex = { version = "0.13.0", default-features = false, features =
["frame"] }
-zstd = "0.13.3"
rand = { workspace = true }
num = { workspace = true }
bytes = { workspace = true }
@@ -62,11 +58,11 @@ datafusion-physical-expr-adapter = { workspace = true }
datafusion-datasource = { workspace = true }
datafusion-spark = { workspace = true }
once_cell = "1.18.0"
-crc32fast = "1.3.2"
-simd-adler32 = "0.3.7"
+datafusion-comet-common = { workspace = true }
datafusion-comet-spark-expr = { workspace = true }
datafusion-comet-jni-bridge = { workspace = true }
datafusion-comet-proto = { workspace = true }
+datafusion-comet-shuffle = { workspace = true }
object_store = { workspace = true }
url = { workspace = true }
aws-config = { workspace = true }
@@ -121,14 +117,6 @@ harness = false
name = "bit_util"
harness = false
-[[bench]]
-name = "row_columnar"
-harness = false
-
-[[bench]]
-name = "shuffle_writer"
-harness = false
-
[[bench]]
name = "parquet_decode"
harness = false
diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs
index 85fc67246..f556fce41 100644
--- a/native/core/src/execution/mod.rs
+++ b/native/core/src/execution/mod.rs
@@ -23,7 +23,7 @@ pub(crate) mod metrics;
pub mod operators;
pub(crate) mod planner;
pub mod serde;
-pub mod shuffle;
+pub use datafusion_comet_shuffle as shuffle;
pub(crate) mod sort;
pub(crate) mod spark_plan;
pub use datafusion_comet_spark_expr::timezone;
diff --git a/native/core/src/execution/tracing.rs
b/native/core/src/execution/tracing.rs
index 01351565f..b02006efb 100644
--- a/native/core/src/execution/tracing.rs
+++ b/native/core/src/execution/tracing.rs
@@ -15,128 +15,4 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion::common::instant::Instant;
-use once_cell::sync::Lazy;
-use std::fs::{File, OpenOptions};
-use std::io::{BufWriter, Write};
-use std::sync::{Arc, Mutex};
-
-pub(crate) static RECORDER: Lazy<Recorder> = Lazy::new(Recorder::new);
-
-/// Log events using Chrome trace format JSON
-/// https://github.com/catapult-project/catapult/blob/main/tracing/README.md
-pub struct Recorder {
- now: Instant,
- writer: Arc<Mutex<BufWriter<File>>>,
-}
-
-impl Recorder {
- pub fn new() -> Self {
- let file = OpenOptions::new()
- .create(true)
- .append(true)
- .open("comet-event-trace.json")
- .expect("Error writing tracing");
-
- let mut writer = BufWriter::new(file);
-
- // Write start of JSON array. Note that there is no requirement to
write
- // the closing ']'.
- writer
- .write_all("[ ".as_bytes())
- .expect("Error writing tracing");
- Self {
- now: Instant::now(),
- writer: Arc::new(Mutex::new(writer)),
- }
- }
- pub fn begin_task(&self, name: &str) {
- self.log_event(name, "B")
- }
-
- pub fn end_task(&self, name: &str) {
- self.log_event(name, "E")
- }
-
- pub fn log_memory_usage(&self, name: &str, usage_bytes: u64) {
- let usage_mb = (usage_bytes as f64 / 1024.0 / 1024.0) as usize;
- let json = format!(
- "{{ \"name\": \"{name}\", \"cat\": \"PERF\", \"ph\": \"C\",
\"pid\": 1, \"tid\": {}, \"ts\": {}, \"args\": {{ \"{name}\": {usage_mb} }}
}},\n",
- Self::get_thread_id(),
- self.now.elapsed().as_micros()
- );
- let mut writer = self.writer.lock().unwrap();
- writer
- .write_all(json.as_bytes())
- .expect("Error writing tracing");
- }
-
- fn log_event(&self, name: &str, ph: &str) {
- let json = format!(
- "{{ \"name\": \"{}\", \"cat\": \"PERF\", \"ph\": \"{ph}\",
\"pid\": 1, \"tid\": {}, \"ts\": {} }},\n",
- name,
- Self::get_thread_id(),
- self.now.elapsed().as_micros()
- );
- let mut writer = self.writer.lock().unwrap();
- writer
- .write_all(json.as_bytes())
- .expect("Error writing tracing");
- }
-
- fn get_thread_id() -> u64 {
- let thread_id = std::thread::current().id();
- format!("{thread_id:?}")
- .trim_start_matches("ThreadId(")
- .trim_end_matches(")")
- .parse()
- .expect("Error parsing thread id")
- }
-}
-
-pub(crate) fn trace_begin(name: &str) {
- RECORDER.begin_task(name);
-}
-
-pub(crate) fn trace_end(name: &str) {
- RECORDER.end_task(name);
-}
-
-pub(crate) fn log_memory_usage(name: &str, value: u64) {
- RECORDER.log_memory_usage(name, value);
-}
-
-pub(crate) fn with_trace<T, F>(label: &str, tracing_enabled: bool, f: F) -> T
-where
- F: FnOnce() -> T,
-{
- if tracing_enabled {
- trace_begin(label);
- }
-
- let result = f();
-
- if tracing_enabled {
- trace_end(label);
- }
-
- result
-}
-
-pub(crate) async fn with_trace_async<F, Fut, T>(label: &str, tracing_enabled:
bool, f: F) -> T
-where
- F: FnOnce() -> Fut,
- Fut: std::future::Future<Output = T>,
-{
- if tracing_enabled {
- trace_begin(label);
- }
-
- let result = f().await;
-
- if tracing_enabled {
- trace_end(label);
- }
-
- result
-}
+pub(crate) use datafusion_comet_common::tracing::*;
diff --git a/native/core/src/execution/utils.rs
b/native/core/src/execution/utils.rs
index f95423aa7..2fe6f8758 100644
--- a/native/core/src/execution/utils.rs
+++ b/native/core/src/execution/utils.rs
@@ -97,23 +97,4 @@ impl SparkArrowConvert for ArrayData {
}
}
-/// Converts a slice of bytes to i128. The bytes are serialized in big-endian
order by
-/// `BigInteger.toByteArray()` in Java.
-pub fn bytes_to_i128(slice: &[u8]) -> i128 {
- let mut bytes = [0; 16];
- let mut i = 0;
- while i != 16 && i != slice.len() {
- bytes[i] = slice[slice.len() - 1 - i];
- i += 1;
- }
-
- // if the decimal is negative, we need to flip all the bits
- if (slice[0] as i8) < 0 {
- while i < 16 {
- bytes[i] = !bytes[i];
- i += 1;
- }
- }
-
- i128::from_le_bytes(bytes)
-}
+pub use datafusion_comet_common::bytes_to_i128;
diff --git a/native/jni-bridge/README.md b/native/jni-bridge/README.md
new file mode 100644
index 000000000..d49a3c256
--- /dev/null
+++ b/native/jni-bridge/README.md
@@ -0,0 +1,25 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# datafusion-comet-jni-bridge: JNI Bridge
+
+This crate provides the JNI interaction layer for Apache DataFusion Comet and
is maintained as part of the
+[Apache DataFusion Comet] subproject.
+
+[Apache DataFusion Comet]: https://github.com/apache/datafusion-comet/
diff --git a/native/shuffle/Cargo.toml b/native/shuffle/Cargo.toml
new file mode 100644
index 000000000..e28827edc
--- /dev/null
+++ b/native/shuffle/Cargo.toml
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "datafusion-comet-shuffle"
+description = "Apache DataFusion Comet: shuffle writer and reader"
+version = { workspace = true }
+homepage = { workspace = true }
+repository = { workspace = true }
+authors = { workspace = true }
+readme = { workspace = true }
+license = { workspace = true }
+edition = { workspace = true }
+
+publish = false
+
+[dependencies]
+arrow = { workspace = true }
+async-trait = { workspace = true }
+bytes = { workspace = true }
+crc32fast = "1.3.2"
+datafusion = { workspace = true }
+datafusion-comet-common = { workspace = true }
+datafusion-comet-jni-bridge = { workspace = true }
+datafusion-comet-spark-expr = { workspace = true }
+futures = { workspace = true }
+itertools = "0.14.0"
+jni = "0.21"
+log = "0.4"
+lz4_flex = { version = "0.13.0", default-features = false, features =
["frame"] }
+simd-adler32 = "0.3.7"
+snap = "1.1"
+tokio = { version = "1", features = ["rt-multi-thread"] }
+zstd = "0.13.3"
+
+[dev-dependencies]
+criterion = { version = "0.7", features = ["async", "async_tokio",
"async_std"] }
+datafusion = { workspace = true, features = ["parquet_encryption", "sql"] }
+itertools = "0.14.0"
+tempfile = "3.26.0"
+
+[lib]
+name = "datafusion_comet_shuffle"
+path = "src/lib.rs"
+
+[[bench]]
+name = "shuffle_writer"
+harness = false
+
+[[bench]]
+name = "row_columnar"
+harness = false
diff --git a/native/shuffle/README.md b/native/shuffle/README.md
new file mode 100644
index 000000000..8fba6b032
--- /dev/null
+++ b/native/shuffle/README.md
@@ -0,0 +1,25 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# datafusion-comet-shuffle: Shuffle Writer and Reader
+
+This crate provides the shuffle writer and reader implementation for Apache
DataFusion Comet and is maintained as part
+of the [Apache DataFusion Comet] subproject.
+
+[Apache DataFusion Comet]: https://github.com/apache/datafusion-comet/
diff --git a/native/core/benches/row_columnar.rs
b/native/shuffle/benches/row_columnar.rs
similarity index 99%
rename from native/core/benches/row_columnar.rs
rename to native/shuffle/benches/row_columnar.rs
index 4ee153906..7d3951b4d 100644
--- a/native/core/benches/row_columnar.rs
+++ b/native/shuffle/benches/row_columnar.rs
@@ -22,11 +22,11 @@
//! list, and map types.
use arrow::datatypes::{DataType as ArrowDataType, Field, Fields};
-use comet::execution::shuffle::spark_unsafe::row::{
+use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
+use datafusion_comet_shuffle::spark_unsafe::row::{
process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow,
};
-use comet::execution::shuffle::CompressionCodec;
-use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
+use datafusion_comet_shuffle::CompressionCodec;
use std::sync::Arc;
use tempfile::Builder;
diff --git a/native/core/benches/shuffle_writer.rs
b/native/shuffle/benches/shuffle_writer.rs
similarity index 99%
rename from native/core/benches/shuffle_writer.rs
rename to native/shuffle/benches/shuffle_writer.rs
index 0857ef78c..27abd919f 100644
--- a/native/core/benches/shuffle_writer.rs
+++ b/native/shuffle/benches/shuffle_writer.rs
@@ -19,9 +19,6 @@ use arrow::array::builder::{Date32Builder, Decimal128Builder,
Int32Builder};
use arrow::array::{builder::StringBuilder, Array, Int32Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::row::{RowConverter, SortField};
-use comet::execution::shuffle::{
- CometPartitioning, CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec,
-};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
@@ -32,6 +29,9 @@ use datafusion::{
physical_plan::{common::collect, ExecutionPlan},
prelude::SessionContext,
};
+use datafusion_comet_shuffle::{
+ CometPartitioning, CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec,
+};
use itertools::Itertools;
use std::io::Cursor;
use std::sync::Arc;
diff --git a/native/core/src/execution/shuffle/codec.rs
b/native/shuffle/src/codec.rs
similarity index 99%
rename from native/core/src/execution/shuffle/codec.rs
rename to native/shuffle/src/codec.rs
index 33e6989d4..c8edc2468 100644
--- a/native/core/src/execution/shuffle/codec.rs
+++ b/native/shuffle/src/codec.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use crate::errors::{CometError, CometResult};
use arrow::array::RecordBatch;
use arrow::datatypes::Schema;
use arrow::ipc::reader::StreamReader;
@@ -25,6 +24,7 @@ use crc32fast::Hasher;
use datafusion::common::DataFusionError;
use datafusion::error::Result;
use datafusion::physical_plan::metrics::Time;
+use datafusion_comet_jni_bridge::errors::{CometError, CometResult};
use simd_adler32::Adler32;
use std::io::{Cursor, Seek, SeekFrom, Write};
diff --git a/native/core/src/execution/shuffle/comet_partitioning.rs
b/native/shuffle/src/comet_partitioning.rs
similarity index 98%
rename from native/core/src/execution/shuffle/comet_partitioning.rs
rename to native/shuffle/src/comet_partitioning.rs
index b8d68cd21..c269539a6 100644
--- a/native/core/src/execution/shuffle/comet_partitioning.rs
+++ b/native/shuffle/src/comet_partitioning.rs
@@ -47,7 +47,7 @@ impl CometPartitioning {
}
}
-pub(super) fn pmod(hash: u32, n: usize) -> usize {
+pub(crate) fn pmod(hash: u32, n: usize) -> usize {
let hash = hash as i32;
let n = n as i32;
let r = hash % n;
diff --git a/native/core/src/execution/shuffle/mod.rs
b/native/shuffle/src/lib.rs
similarity index 88%
rename from native/core/src/execution/shuffle/mod.rs
rename to native/shuffle/src/lib.rs
index 6018cff50..7c2fc8403 100644
--- a/native/core/src/execution/shuffle/mod.rs
+++ b/native/shuffle/src/lib.rs
@@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-pub(crate) mod codec;
-mod comet_partitioning;
-mod metrics;
-mod partitioners;
+pub mod codec;
+pub(crate) mod comet_partitioning;
+pub(crate) mod metrics;
+pub(crate) mod partitioners;
mod shuffle_writer;
pub mod spark_unsafe;
-mod writers;
+pub(crate) mod writers;
pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter};
pub use comet_partitioning::CometPartitioning;
diff --git a/native/core/src/execution/shuffle/metrics.rs
b/native/shuffle/src/metrics.rs
similarity index 84%
rename from native/core/src/execution/shuffle/metrics.rs
rename to native/shuffle/src/metrics.rs
index 33b51c3cd..1aba4677d 100644
--- a/native/core/src/execution/shuffle/metrics.rs
+++ b/native/shuffle/src/metrics.rs
@@ -19,34 +19,34 @@ use datafusion::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
};
-pub(super) struct ShufflePartitionerMetrics {
+pub(crate) struct ShufflePartitionerMetrics {
/// metrics
- pub(super) baseline: BaselineMetrics,
+ pub(crate) baseline: BaselineMetrics,
/// Time to perform repartitioning
- pub(super) repart_time: Time,
+ pub(crate) repart_time: Time,
/// Time encoding batches to IPC format
- pub(super) encode_time: Time,
+ pub(crate) encode_time: Time,
/// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL
Metrics.
- pub(super) write_time: Time,
+ pub(crate) write_time: Time,
/// Number of input batches
- pub(super) input_batches: Count,
+ pub(crate) input_batches: Count,
/// count of spills during the execution of the operator
- pub(super) spill_count: Count,
+ pub(crate) spill_count: Count,
/// total spilled bytes during the execution of the operator
- pub(super) spilled_bytes: Count,
+ pub(crate) spilled_bytes: Count,
/// The original size of spilled data. Different to `spilled_bytes`
because of compression.
- pub(super) data_size: Count,
+ pub(crate) data_size: Count,
}
impl ShufflePartitionerMetrics {
- pub(super) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) ->
Self {
+ pub(crate) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) ->
Self {
Self {
baseline: BaselineMetrics::new(metrics, partition),
repart_time:
MetricBuilder::new(metrics).subset_time("repart_time", partition),
diff --git a/native/core/src/execution/shuffle/partitioners/mod.rs
b/native/shuffle/src/partitioners/mod.rs
similarity index 83%
rename from native/core/src/execution/shuffle/partitioners/mod.rs
rename to native/shuffle/src/partitioners/mod.rs
index b9058f66f..a6d589677 100644
--- a/native/core/src/execution/shuffle/partitioners/mod.rs
+++ b/native/shuffle/src/partitioners/mod.rs
@@ -22,12 +22,12 @@ mod single_partition;
use arrow::record_batch::RecordBatch;
use datafusion::common::Result;
-pub(super) use multi_partition::MultiPartitionShuffleRepartitioner;
-pub(super) use partitioned_batch_iterator::PartitionedBatchIterator;
-pub(super) use single_partition::SinglePartitionShufflePartitioner;
+pub(crate) use multi_partition::MultiPartitionShuffleRepartitioner;
+pub(crate) use partitioned_batch_iterator::PartitionedBatchIterator;
+pub(crate) use single_partition::SinglePartitionShufflePartitioner;
#[async_trait::async_trait]
-pub(super) trait ShufflePartitioner: Send + Sync {
+pub(crate) trait ShufflePartitioner: Send + Sync {
/// Insert a batch into the partitioner
async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>;
/// Write shuffle data and shuffle index file to disk
diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs
b/native/shuffle/src/partitioners/multi_partition.rs
similarity index 98%
rename from native/core/src/execution/shuffle/partitioners/multi_partition.rs
rename to native/shuffle/src/partitioners/multi_partition.rs
index 9c366ad46..42290c551 100644
--- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs
+++ b/native/shuffle/src/partitioners/multi_partition.rs
@@ -15,16 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
-use crate::execution::shuffle::partitioners::partitioned_batch_iterator::{
+use crate::metrics::ShufflePartitionerMetrics;
+use crate::partitioners::partitioned_batch_iterator::{
PartitionedBatchIterator, PartitionedBatchesProducer,
};
-use crate::execution::shuffle::partitioners::ShufflePartitioner;
-use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter};
-use crate::execution::shuffle::{
- comet_partitioning, CometPartitioning, CompressionCodec,
ShuffleBlockWriter,
-};
-use crate::execution::tracing::{with_trace, with_trace_async};
+use crate::partitioners::ShufflePartitioner;
+use crate::writers::{BufBatchWriter, PartitionWriter};
+use crate::{comet_partitioning, CometPartitioning, CompressionCodec,
ShuffleBlockWriter};
use arrow::array::{ArrayRef, RecordBatch};
use arrow::datatypes::SchemaRef;
use datafusion::common::utils::proxy::VecAllocExt;
@@ -32,6 +29,7 @@ use datafusion::common::DataFusionError;
use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::metrics::Time;
+use datafusion_comet_common::tracing::{with_trace, with_trace_async};
use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes;
use itertools::Itertools;
use std::fmt;
diff --git
a/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs
b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs
similarity index 100%
rename from
native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs
rename to native/shuffle/src/partitioners/partitioned_batch_iterator.rs
diff --git a/native/core/src/execution/shuffle/partitioners/single_partition.rs
b/native/shuffle/src/partitioners/single_partition.rs
similarity index 96%
rename from native/core/src/execution/shuffle/partitioners/single_partition.rs
rename to native/shuffle/src/partitioners/single_partition.rs
index eeca4458c..5801ef613 100644
--- a/native/core/src/execution/shuffle/partitioners/single_partition.rs
+++ b/native/shuffle/src/partitioners/single_partition.rs
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
-use crate::execution::shuffle::partitioners::ShufflePartitioner;
-use crate::execution::shuffle::writers::BufBatchWriter;
-use crate::execution::shuffle::{CompressionCodec, ShuffleBlockWriter};
+use crate::metrics::ShufflePartitionerMetrics;
+use crate::partitioners::ShufflePartitioner;
+use crate::writers::BufBatchWriter;
+use crate::{CompressionCodec, ShuffleBlockWriter};
use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
use datafusion::common::DataFusionError;
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs
b/native/shuffle/src/shuffle_writer.rs
similarity index 98%
rename from native/core/src/execution/shuffle/shuffle_writer.rs
rename to native/shuffle/src/shuffle_writer.rs
index fe1bf0fcc..e649aaac6 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/shuffle/src/shuffle_writer.rs
@@ -17,12 +17,11 @@
//! Defines the External shuffle repartition plan.
-use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
-use crate::execution::shuffle::partitioners::{
+use crate::metrics::ShufflePartitionerMetrics;
+use crate::partitioners::{
MultiPartitionShuffleRepartitioner, ShufflePartitioner,
SinglePartitionShufflePartitioner,
};
-use crate::execution::shuffle::{CometPartitioning, CompressionCodec};
-use crate::execution::tracing::with_trace_async;
+use crate::{CometPartitioning, CompressionCodec};
use async_trait::async_trait;
use datafusion::common::exec_datafusion_err;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
@@ -39,6 +38,7 @@ use datafusion::{
Statistics,
},
};
+use datafusion_comet_common::tracing::with_trace_async;
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use std::{
any::Any,
@@ -265,7 +265,7 @@ async fn external_shuffle(
#[cfg(test)]
mod test {
use super::*;
- use crate::execution::shuffle::{read_ipc_compressed, ShuffleBlockWriter};
+ use crate::{read_ipc_compressed, ShuffleBlockWriter};
use arrow::array::{Array, StringArray, StringBuilder};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
@@ -591,7 +591,7 @@ mod test {
#[test]
#[cfg_attr(miri, ignore)]
fn test_batch_coalescing_reduces_size() {
- use crate::execution::shuffle::writers::BufBatchWriter;
+ use crate::writers::BufBatchWriter;
use arrow::array::Int32Array;
// Create a wide schema to amplify per-block schema overhead
diff --git a/native/core/src/execution/shuffle/spark_unsafe/list.rs
b/native/shuffle/src/spark_unsafe/list.rs
similarity index 98%
rename from native/core/src/execution/shuffle/spark_unsafe/list.rs
rename to native/shuffle/src/spark_unsafe/list.rs
index d9c93b1c6..4eb293895 100644
--- a/native/core/src/execution/shuffle/spark_unsafe/list.rs
+++ b/native/shuffle/src/spark_unsafe/list.rs
@@ -15,14 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use crate::{
- errors::CometError,
- execution::shuffle::spark_unsafe::{
- map::append_map_elements,
- row::{
- append_field, downcast_builder_ref, impl_primitive_accessors,
SparkUnsafeObject,
- SparkUnsafeRow,
- },
+use crate::spark_unsafe::{
+ map::append_map_elements,
+ row::{
+ append_field, downcast_builder_ref, impl_primitive_accessors,
SparkUnsafeObject,
+ SparkUnsafeRow,
},
};
use arrow::array::{
@@ -34,6 +31,7 @@ use arrow::array::{
MapBuilder,
};
use arrow::datatypes::{DataType, TimeUnit};
+use datafusion_comet_jni_bridge::errors::CometError;
/// Generates bulk append methods for primitive types in SparkUnsafeArray.
///
diff --git a/native/core/src/execution/shuffle/spark_unsafe/map.rs
b/native/shuffle/src/spark_unsafe/map.rs
similarity index 97%
rename from native/core/src/execution/shuffle/spark_unsafe/map.rs
rename to native/shuffle/src/spark_unsafe/map.rs
index 19b67c43d..57444cee7 100644
--- a/native/core/src/execution/shuffle/spark_unsafe/map.rs
+++ b/native/shuffle/src/spark_unsafe/map.rs
@@ -15,12 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use crate::{
- errors::CometError,
- execution::shuffle::spark_unsafe::list::{append_to_builder,
SparkUnsafeArray},
-};
+use crate::spark_unsafe::list::{append_to_builder, SparkUnsafeArray};
use arrow::array::builder::{ArrayBuilder, MapBuilder, MapFieldNames};
use arrow::datatypes::{DataType, FieldRef};
+use datafusion_comet_jni_bridge::errors::CometError;
pub struct SparkUnsafeMap {
pub(crate) keys: SparkUnsafeArray,
diff --git a/native/core/src/execution/shuffle/spark_unsafe/mod.rs
b/native/shuffle/src/spark_unsafe/mod.rs
similarity index 100%
rename from native/core/src/execution/shuffle/spark_unsafe/mod.rs
rename to native/shuffle/src/spark_unsafe/mod.rs
diff --git a/native/core/src/execution/shuffle/spark_unsafe/row.rs
b/native/shuffle/src/spark_unsafe/row.rs
similarity index 99%
rename from native/core/src/execution/shuffle/spark_unsafe/row.rs
rename to native/shuffle/src/spark_unsafe/row.rs
index 7ebf18d8d..da980af8f 100644
--- a/native/core/src/execution/shuffle/spark_unsafe/row.rs
+++ b/native/shuffle/src/spark_unsafe/row.rs
@@ -17,18 +17,10 @@
//! Utils for supporting native sort-based columnar shuffle.
-use crate::{
- errors::CometError,
- execution::{
- shuffle::{
- codec::{Checksum, ShuffleBlockWriter},
- spark_unsafe::{
- list::{append_list_element, SparkUnsafeArray},
- map::{append_map_elements, get_map_key_value_fields,
SparkUnsafeMap},
- },
- },
- utils::bytes_to_i128,
- },
+use crate::codec::{Checksum, ShuffleBlockWriter};
+use crate::spark_unsafe::{
+ list::{append_list_element, SparkUnsafeArray},
+ map::{append_map_elements, get_map_key_value_fields, SparkUnsafeMap},
};
use arrow::array::{
builder::{
@@ -44,6 +36,8 @@ use arrow::compute::cast;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow::error::ArrowError;
use datafusion::physical_plan::metrics::Time;
+use datafusion_comet_common::bytes_to_i128;
+use datafusion_comet_jni_bridge::errors::CometError;
use jni::sys::{jint, jlong};
use std::{
fs::OpenOptions,
@@ -403,7 +397,7 @@ macro_rules! get_field_builder {
}
// Expose the macro for other modules.
-use crate::execution::shuffle::CompressionCodec;
+use crate::CompressionCodec;
pub(crate) use downcast_builder_ref;
/// Appends field of row to the given struct builder. `dt` is the data type of
the field.
diff --git a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
b/native/shuffle/src/writers/buf_batch_writer.rs
similarity index 99%
rename from native/core/src/execution/shuffle/writers/buf_batch_writer.rs
rename to native/shuffle/src/writers/buf_batch_writer.rs
index 8d056d7bb..6344a8e5f 100644
--- a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
+++ b/native/shuffle/src/writers/buf_batch_writer.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::execution::shuffle::ShuffleBlockWriter;
+use crate::ShuffleBlockWriter;
use arrow::array::RecordBatch;
use arrow::compute::kernels::coalesce::BatchCoalescer;
use datafusion::physical_plan::metrics::Time;
diff --git a/native/core/src/execution/shuffle/writers/mod.rs
b/native/shuffle/src/writers/mod.rs
similarity index 89%
rename from native/core/src/execution/shuffle/writers/mod.rs
rename to native/shuffle/src/writers/mod.rs
index d41363b7f..b58989e46 100644
--- a/native/core/src/execution/shuffle/writers/mod.rs
+++ b/native/shuffle/src/writers/mod.rs
@@ -18,5 +18,5 @@
mod buf_batch_writer;
mod partition_writer;
-pub(super) use buf_batch_writer::BufBatchWriter;
-pub(super) use partition_writer::PartitionWriter;
+pub(crate) use buf_batch_writer::BufBatchWriter;
+pub(crate) use partition_writer::PartitionWriter;
diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs
b/native/shuffle/src/writers/partition_writer.rs
similarity index 94%
rename from native/core/src/execution/shuffle/writers/partition_writer.rs
rename to native/shuffle/src/writers/partition_writer.rs
index 7c2dbe044..48017871d 100644
--- a/native/core/src/execution/shuffle/writers/partition_writer.rs
+++ b/native/shuffle/src/writers/partition_writer.rs
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
-use crate::execution::shuffle::partitioners::PartitionedBatchIterator;
-use crate::execution::shuffle::writers::buf_batch_writer::BufBatchWriter;
-use crate::execution::shuffle::ShuffleBlockWriter;
+use crate::metrics::ShufflePartitionerMetrics;
+use crate::partitioners::PartitionedBatchIterator;
+use crate::writers::buf_batch_writer::BufBatchWriter;
+use crate::ShuffleBlockWriter;
use datafusion::common::DataFusionError;
use datafusion::execution::disk_manager::RefCountedTempFile;
use datafusion::execution::runtime_env::RuntimeEnv;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]