This is an automated email from the ASF dual-hosted git repository.
hcr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/main by this push:
new e9b1e830b [QDP] I/O errors: expose underlying cause via
Error::source() (#1026)
e9b1e830b is described below
commit e9b1e830b24724b0e0a5842a6f4fc9a5033d2c3c
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Mon Feb 9 15:55:31 2026 +0800
[QDP] I/O errors: expose underlying cause via Error::source() (#1026)
---
qdp/qdp-core/src/error.rs | 27 ++++++++++++++++++++++++
qdp/qdp-core/src/io.rs | 18 ++++++++++------
qdp/qdp-core/src/readers/arrow_ipc.rs | 19 ++++++++++-------
qdp/qdp-core/src/readers/numpy.rs | 20 +++++++++++-------
qdp/qdp-core/src/readers/parquet.rs | 38 +++++++++++++++++++++-------------
qdp/qdp-core/src/readers/tensorflow.rs | 11 +++++++---
qdp/qdp-core/src/readers/torch.rs | 13 +++++++-----
7 files changed, 103 insertions(+), 43 deletions(-)
diff --git a/qdp/qdp-core/src/error.rs b/qdp/qdp-core/src/error.rs
index 94225357b..09392bf55 100644
--- a/qdp/qdp-core/src/error.rs
+++ b/qdp/qdp-core/src/error.rs
@@ -37,6 +37,14 @@ pub enum MahoutError {
#[error("I/O error: {0}")]
Io(String),
+ /// I/O error with underlying cause (enables Error::source() and downcast).
+ #[error("I/O error: {message}")]
+ IoWithSource {
+ message: String,
+ #[source]
+ source: std::io::Error,
+ },
+
#[error("Not implemented: {0}")]
NotImplemented(String),
}
@@ -64,3 +72,22 @@ pub fn cuda_error_to_string(code: i32) -> &'static str {
_ => "Unknown CUDA error",
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::error::Error as StdError;
+ use std::io;
+
+ #[test]
+ fn io_with_source_provides_source_and_downcast() {
+ let inner = io::Error::new(io::ErrorKind::NotFound, "file not found");
+ let err = MahoutError::IoWithSource {
+ message: format!("open failed: {}", inner),
+ source: inner,
+ };
+ assert!(err.to_string().contains("open failed"));
+ let source = err.source().expect("IoWithSource must have source");
+ assert!(source.downcast_ref::<io::Error>().is_some());
+ }
+}
diff --git a/qdp/qdp-core/src/io.rs b/qdp/qdp-core/src/io.rs
index ab3e903da..346e6a4ab 100644
--- a/qdp/qdp-core/src/io.rs
+++ b/qdp/qdp-core/src/io.rs
@@ -98,8 +98,10 @@ pub fn write_parquet<P: AsRef<Path>>(
let batch = RecordBatch::try_new(schema.clone(), vec![array_ref])
.map_err(|e| MahoutError::Io(format!("Failed to create RecordBatch:
{}", e)))?;
- let file = File::create(path.as_ref())
- .map_err(|e| MahoutError::Io(format!("Failed to create Parquet file:
{}", e)))?;
+ let file = File::create(path.as_ref()).map_err(|e|
MahoutError::IoWithSource {
+ message: format!("Failed to create Parquet file: {}", e),
+ source: e,
+ })?;
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(file, schema, Some(props))
@@ -120,8 +122,10 @@ pub fn write_parquet<P: AsRef<Path>>(
///
/// Returns one array per row group for zero-copy access.
pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) ->
Result<Vec<Float64Array>> {
- let file = File::open(path.as_ref())
- .map_err(|e| MahoutError::Io(format!("Failed to open Parquet file:
{}", e)))?;
+ let file = File::open(path.as_ref()).map_err(|e| MahoutError::IoWithSource
{
+ message: format!("Failed to open Parquet file: {}", e),
+ source: e,
+ })?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)
.map_err(|e| MahoutError::Io(format!("Failed to create Parquet reader:
{}", e)))?;
@@ -193,8 +197,10 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>(
let batch = RecordBatch::try_new(schema.clone(), vec![array_ref])
.map_err(|e| MahoutError::Io(format!("Failed to create RecordBatch:
{}", e)))?;
- let file = File::create(path.as_ref())
- .map_err(|e| MahoutError::Io(format!("Failed to create Parquet file:
{}", e)))?;
+ let file = File::create(path.as_ref()).map_err(|e|
MahoutError::IoWithSource {
+ message: format!("Failed to create Parquet file: {}", e),
+ source: e,
+ })?;
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(file, schema, Some(props))
diff --git a/qdp/qdp-core/src/readers/arrow_ipc.rs
b/qdp/qdp-core/src/readers/arrow_ipc.rs
index 57064f129..f2c781e44 100644
--- a/qdp/qdp-core/src/readers/arrow_ipc.rs
+++ b/qdp/qdp-core/src/readers/arrow_ipc.rs
@@ -49,11 +49,14 @@ impl ArrowIPCReader {
)));
}
Err(e) => {
- return Err(MahoutError::Io(format!(
- "Failed to check if Arrow IPC file exists at {}: {}",
- path.display(),
- e
- )));
+ return Err(MahoutError::IoWithSource {
+ message: format!(
+ "Failed to check if Arrow IPC file exists at {}: {}",
+ path.display(),
+ e
+ ),
+ source: e,
+ });
}
Ok(true) => {}
}
@@ -74,8 +77,10 @@ impl DataReader for ArrowIPCReader {
}
self.read = true;
- let file = File::open(&self.path)
- .map_err(|e| MahoutError::Io(format!("Failed to open Arrow IPC
file: {}", e)))?;
+ let file = File::open(&self.path).map_err(|e|
MahoutError::IoWithSource {
+ message: format!("Failed to open Arrow IPC file: {}", e),
+ source: e,
+ })?;
let reader = ArrowFileReader::try_new(file, None)
.map_err(|e| MahoutError::Io(format!("Failed to create Arrow IPC
reader: {}", e)))?;
diff --git a/qdp/qdp-core/src/readers/numpy.rs
b/qdp/qdp-core/src/readers/numpy.rs
index aecf4cf12..6187fe5cd 100644
--- a/qdp/qdp-core/src/readers/numpy.rs
+++ b/qdp/qdp-core/src/readers/numpy.rs
@@ -65,11 +65,14 @@ impl NumpyReader {
)));
}
Err(e) => {
- return Err(MahoutError::Io(format!(
- "Failed to check if NumPy file exists at {}: {}",
- path.display(),
- e
- )));
+ return Err(MahoutError::IoWithSource {
+ message: format!(
+ "Failed to check if NumPy file exists at {}: {}",
+ path.display(),
+ e
+ ),
+ source: e,
+ });
}
Ok(true) => {}
}
@@ -92,9 +95,10 @@ impl DataReader for NumpyReader {
// Read the .npy file
let array: Array2<f64> = ndarray_npy::read_npy(&self.path).map_err(|e|
match e {
- ReadNpyError::Io(io_err) => {
- MahoutError::Io(format!("Failed to read NumPy file: {}",
io_err))
- }
+ ReadNpyError::Io(io_err) => MahoutError::IoWithSource {
+ message: format!("Failed to read NumPy file: {}", io_err),
+ source: io_err,
+ },
_ => MahoutError::InvalidInput(format!("Failed to parse NumPy
file: {}", e)),
})?;
diff --git a/qdp/qdp-core/src/readers/parquet.rs
b/qdp/qdp-core/src/readers/parquet.rs
index db1148c55..91bb0007e 100644
--- a/qdp/qdp-core/src/readers/parquet.rs
+++ b/qdp/qdp-core/src/readers/parquet.rs
@@ -51,17 +51,22 @@ impl ParquetReader {
)));
}
Err(e) => {
- return Err(MahoutError::Io(format!(
- "Failed to check if Parquet file exists at {}: {}",
- path.display(),
- e
- )));
+ return Err(MahoutError::IoWithSource {
+ message: format!(
+ "Failed to check if Parquet file exists at {}: {}",
+ path.display(),
+ e
+ ),
+ source: e,
+ });
}
Ok(true) => {}
}
- let file = File::open(path)
- .map_err(|e| MahoutError::Io(format!("Failed to open Parquet file:
{}", e)))?;
+ let file = File::open(path).map_err(|e| MahoutError::IoWithSource {
+ message: format!("Failed to open Parquet file: {}", e),
+ source: e,
+ })?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)
.map_err(|e| MahoutError::Io(format!("Failed to create Parquet
reader: {}", e)))?;
@@ -262,17 +267,22 @@ impl ParquetStreamingReader {
)));
}
Err(e) => {
- return Err(MahoutError::Io(format!(
- "Failed to check if Parquet file exists at {}: {}",
- path.display(),
- e
- )));
+ return Err(MahoutError::IoWithSource {
+ message: format!(
+ "Failed to check if Parquet file exists at {}: {}",
+ path.display(),
+ e
+ ),
+ source: e,
+ });
}
Ok(true) => {}
}
- let file = File::open(path)
- .map_err(|e| MahoutError::Io(format!("Failed to open Parquet file:
{}", e)))?;
+ let file = File::open(path).map_err(|e| MahoutError::IoWithSource {
+ message: format!("Failed to open Parquet file: {}", e),
+ source: e,
+ })?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)
.map_err(|e| MahoutError::Io(format!("Failed to create Parquet
reader: {}", e)))?;
diff --git a/qdp/qdp-core/src/readers/tensorflow.rs
b/qdp/qdp-core/src/readers/tensorflow.rs
index d2ed9e103..8ba8efcbd 100644
--- a/qdp/qdp-core/src/readers/tensorflow.rs
+++ b/qdp/qdp-core/src/readers/tensorflow.rs
@@ -49,12 +49,17 @@ impl TensorFlowReader {
/// Create a new TensorFlow reader from a file path.
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
// Read entire file into memory (single read to avoid multiple I/O
operations)
- let mut file = File::open(path.as_ref())
- .map_err(|e| MahoutError::Io(format!("Failed to open TensorFlow
file: {}", e)))?;
+ let mut file = File::open(path.as_ref()).map_err(|e|
MahoutError::IoWithSource {
+ message: format!("Failed to open TensorFlow file: {}", e),
+ source: e,
+ })?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)
- .map_err(|e| MahoutError::Io(format!("Failed to read TensorFlow
file: {}", e)))?;
+ .map_err(|e| MahoutError::IoWithSource {
+ message: format!("Failed to read TensorFlow file: {}", e),
+ source: e,
+ })?;
// Use Bytes for decode input; with build.rs config.bytes(...) this
avoids copying tensor_content during decode
let buffer = Bytes::from(buffer);
diff --git a/qdp/qdp-core/src/readers/torch.rs
b/qdp/qdp-core/src/readers/torch.rs
index 27fd41057..d031ff529 100644
--- a/qdp/qdp-core/src/readers/torch.rs
+++ b/qdp/qdp-core/src/readers/torch.rs
@@ -51,11 +51,14 @@ impl TorchReader {
)));
}
Err(e) => {
- return Err(MahoutError::Io(format!(
- "Failed to check if PyTorch file exists at {}: {}",
- path.display(),
- e
- )));
+ return Err(MahoutError::IoWithSource {
+ message: format!(
+ "Failed to check if PyTorch file exists at {}: {}",
+ path.display(),
+ e
+ ),
+ source: e,
+ });
}
Ok(true) => {}
}