This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new f30b81b7 feat: add config option for skipping arrow ipc read 
validation (#1374)
f30b81b7 is described below

commit f30b81b7de6568dcf4a323675e8e90c1613ab7a7
Author: jgrim <[email protected]>
AuthorDate: Sat Jan 17 20:18:17 2026 +0100

    feat: add config option for skipping arrow ipc read validation (#1374)
    
    * feat: add config option for skipping arrow ipc read validation
    
    * fix: remove unused dependencies
---
 Cargo.toml                                          |  2 --
 ballista/core/Cargo.toml                            |  2 ++
 ballista/core/src/execution_plans/shuffle_reader.rs | 14 +++++++++++---
 ballista/executor/Cargo.toml                        |  3 ++-
 ballista/executor/src/flight_service.rs             |  8 ++++++--
 5 files changed, 21 insertions(+), 8 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 11ce13ae..833d47f1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -33,8 +33,6 @@ rust-version = "1.88.0"
 arrow = { version = "57", features = ["ipc_compression"] }
 arrow-flight = { version = "57", features = ["flight-sql-experimental"] }
 clap = { version = "4.5", features = ["derive", "cargo"] }
-configure_me = { version = "0.4.0" }
-configure_me_codegen = { version = "0.4.4" }
 datafusion = "51.0.0"
 datafusion-cli = "51.0.0"
 datafusion-proto = "51.0.0"
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index dbc362b7..c7e38574 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -35,7 +35,9 @@ exclude = ["*.proto"]
 rustc-args = ["--cfg", "docsrs"]
 
 [features]
+arrow-ipc-optimizations = []
 build-binary = ["aws-config", "aws-credential-types", "clap", "object_store"]
+default = ["arrow-ipc-optimizations"]
 docsrs = []
 # Used for testing ONLY: causes all values to hash to the same value (test for 
collisions)
 force_hash_collisions = ["datafusion/force_hash_collisions"]
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 2ebc2fd3..5063d1d7 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -545,9 +545,17 @@ fn fetch_partition_local_inner(
         BallistaError::General(format!("Failed to open partition file at 
{path}: {e:?}"))
     })?;
     let file = BufReader::new(file);
-    let reader = StreamReader::try_new(file, None).map_err(|e| {
-        BallistaError::General(format!("Failed to new arrow FileReader at 
{path}: {e:?}"))
-    })?;
+    // Safety: setting `skip_validation` requires `unsafe`, user assures data 
is valid
+    let reader = unsafe {
+        StreamReader::try_new(file, None)
+            .map_err(|e| {
+                BallistaError::General(format!(
+                    "Failed to create new arrow StreamReader at {path}: {e:?}"
+                ))
+            })?
+            .with_skip_validation(cfg!(feature = "arrow-ipc-optimizations"))
+    };
+
     Ok(reader)
 }
 
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index f8f25fda..32dd8fa0 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -33,8 +33,9 @@ path = "src/bin/main.rs"
 required-features = ["build-binary"]
 
 [features]
+arrow-ipc-optimizations = []
 build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing", 
"ballista-core/build-binary"]
-default = ["build-binary", "mimalloc"]
+default = ["arrow-ipc-optimizations", "build-binary", "mimalloc"]
 
 [dependencies]
 arrow = { workspace = true }
diff --git a/ballista/executor/src/flight_service.rs 
b/ballista/executor/src/flight_service.rs
index 635ec699..a961ee6d 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -105,8 +105,12 @@ impl FlightService for BallistaFlightService {
                     })
                     .map_err(|e| from_ballista_err(&e))?;
                 let file = BufReader::new(file);
-                let reader =
-                    StreamReader::try_new(file, None).map_err(|e| 
from_arrow_err(&e))?;
+                // Safety: setting `skip_validation` requires `unsafe`, user 
assures data is valid
+                let reader = unsafe {
+                    StreamReader::try_new(file, None)
+                        .map_err(|e| from_arrow_err(&e))?
+                        .with_skip_validation(cfg!(feature = 
"arrow-ipc-optimizations"))
+                };
 
                 let (tx, rx) = channel(2);
                 let schema = reader.schema();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to