This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new f30b81b7 feat: add config option for skipping arrow ipc read
validation (#1374)
f30b81b7 is described below
commit f30b81b7de6568dcf4a323675e8e90c1613ab7a7
Author: jgrim <[email protected]>
AuthorDate: Sat Jan 17 20:18:17 2026 +0100
feat: add config option for skipping arrow ipc read validation (#1374)
* feat: add config option for skipping arrow ipc read validation
* fix: remove unused dependencies
---
Cargo.toml | 2 --
ballista/core/Cargo.toml | 2 ++
ballista/core/src/execution_plans/shuffle_reader.rs | 14 +++++++++++---
ballista/executor/Cargo.toml | 3 ++-
ballista/executor/src/flight_service.rs | 8 ++++++--
5 files changed, 21 insertions(+), 8 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 11ce13ae..833d47f1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -33,8 +33,6 @@ rust-version = "1.88.0"
arrow = { version = "57", features = ["ipc_compression"] }
arrow-flight = { version = "57", features = ["flight-sql-experimental"] }
clap = { version = "4.5", features = ["derive", "cargo"] }
-configure_me = { version = "0.4.0" }
-configure_me_codegen = { version = "0.4.4" }
datafusion = "51.0.0"
datafusion-cli = "51.0.0"
datafusion-proto = "51.0.0"
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index dbc362b7..c7e38574 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -35,7 +35,9 @@ exclude = ["*.proto"]
rustc-args = ["--cfg", "docsrs"]
[features]
+arrow-ipc-optimizations = []
build-binary = ["aws-config", "aws-credential-types", "clap", "object_store"]
+default = ["arrow-ipc-optimizations"]
docsrs = []
# Used for testing ONLY: causes all values to hash to the same value (test for
collisions)
force_hash_collisions = ["datafusion/force_hash_collisions"]
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 2ebc2fd3..5063d1d7 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -545,9 +545,17 @@ fn fetch_partition_local_inner(
BallistaError::General(format!("Failed to open partition file at
{path}: {e:?}"))
})?;
let file = BufReader::new(file);
- let reader = StreamReader::try_new(file, None).map_err(|e| {
- BallistaError::General(format!("Failed to new arrow FileReader at
{path}: {e:?}"))
- })?;
+ // Safety: setting `skip_validation` requires `unsafe`, user assures data
is valid
+ let reader = unsafe {
+ StreamReader::try_new(file, None)
+ .map_err(|e| {
+ BallistaError::General(format!(
+ "Failed to create new arrow StreamReader at {path}: {e:?}"
+ ))
+ })?
+ .with_skip_validation(cfg!(feature = "arrow-ipc-optimizations"))
+ };
+
Ok(reader)
}
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index f8f25fda..32dd8fa0 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -33,8 +33,9 @@ path = "src/bin/main.rs"
required-features = ["build-binary"]
[features]
+arrow-ipc-optimizations = []
build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing",
"ballista-core/build-binary"]
-default = ["build-binary", "mimalloc"]
+default = ["arrow-ipc-optimizations", "build-binary", "mimalloc"]
[dependencies]
arrow = { workspace = true }
diff --git a/ballista/executor/src/flight_service.rs
b/ballista/executor/src/flight_service.rs
index 635ec699..a961ee6d 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -105,8 +105,12 @@ impl FlightService for BallistaFlightService {
})
.map_err(|e| from_ballista_err(&e))?;
let file = BufReader::new(file);
- let reader =
- StreamReader::try_new(file, None).map_err(|e|
from_arrow_err(&e))?;
+ // Safety: setting `skip_validation` requires `unsafe`, user
assures data is valid
+ let reader = unsafe {
+ StreamReader::try_new(file, None)
+ .map_err(|e| from_arrow_err(&e))?
+ .with_skip_validation(cfg!(feature =
"arrow-ipc-optimizations"))
+ };
let (tx, rx) = channel(2);
let schema = reader.schema();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]