This is an automated email from the ASF dual-hosted git repository. adriangb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 3d5863b059 Use DataFusionError instead of ArrowError in FileOpenFuture (#17397) 3d5863b059 is described below commit 3d5863b05950da382df7e070f457d8166618d270 Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com> AuthorDate: Thu Sep 4 13:23:09 2025 -0500 Use DataFusionError instead of ArrowError in FileOpenFuture (#17397) --- .../src/datasource/physical_plan/arrow_file.rs | 15 +++++---- .../physical_optimizer/filter_pushdown/util.rs | 12 ++------ datafusion/datasource-avro/src/source.rs | 8 +++-- datafusion/datasource-csv/src/source.rs | 9 ++++-- datafusion/datasource-json/src/source.rs | 9 ++++-- datafusion/datasource-parquet/src/opener.rs | 36 ++++++++-------------- datafusion/datasource/src/file_stream.rs | 19 +++++------- docs/source/library-user-guide/upgrading.md | 20 ++++++++++++ 8 files changed, 69 insertions(+), 59 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index d0af96329b..68334a4a18 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -26,7 +26,7 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use arrow::buffer::Buffer; use arrow::datatypes::SchemaRef; use arrow_ipc::reader::FileDecoder; -use datafusion_common::Statistics; +use datafusion_common::{exec_datafusion_err, Statistics}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::PartitionedFile; @@ -140,7 +140,9 @@ impl FileOpener for ArrowOpener { let arrow_reader = arrow::ipc::reader::FileReader::try_new( file, projection, )?; - Ok(futures::stream::iter(arrow_reader).boxed()) + Ok(futures::stream::iter(arrow_reader) + .map(|r| r.map_err(Into::into)) + .boxed()) } GetResultPayload::Stream(_) => { let bytes = r.bytes().await?; @@ -148,7 +150,9 @@ impl FileOpener for ArrowOpener { let arrow_reader = arrow::ipc::reader::FileReader::try_new( cursor, projection, )?; - Ok(futures::stream::iter(arrow_reader).boxed()) + Ok(futures::stream::iter(arrow_reader) + .map(|r| r.map_err(Into::into)) + .boxed()) } } } @@ -179,9 +183,7 @@ impl FileOpener for ArrowOpener { footer_buf[..footer_len].try_into().unwrap(), ) .map_err(|err| { - arrow::error::ArrowError::ParseError(format!( - "Unable to get root as footer: {err:?}" - )) + exec_datafusion_err!("Unable to get root as footer: {err:?}") })?; // build decoder according to footer & projection let schema = @@ -248,6 +250,7 @@ impl FileOpener for ArrowOpener { .transpose() }), ) + .map(|r| r.map_err(Into::into)) .boxed()) } } diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 64cee011cc..7d0020b2e9 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -16,7 +16,6 @@ // under the License. use arrow::datatypes::SchemaRef; -use arrow::error::ArrowError; use arrow::{array::RecordBatch, compute::concat_batches}; use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr}; use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics}; @@ -39,7 +38,7 @@ use datafusion_physical_plan::{ metrics::ExecutionPlanMetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, }; -use futures::stream::BoxStream; +use futures::StreamExt; use futures::{FutureExt, Stream}; use object_store::ObjectStore; use std::{ @@ -93,12 +92,7 @@ impl FileOpener for TestOpener { let stream = TestStream::new(batches); - Ok((async { - let stream: BoxStream<'static, Result<RecordBatch, ArrowError>> = - Box::pin(stream); - Ok(stream) - }) - .boxed()) + Ok((async { Ok(stream.boxed()) }).boxed()) } } @@ -344,7 +338,7 @@ impl TestStream { } impl Stream for TestStream { - type Item = Result<RecordBatch, ArrowError>; + type Item = Result<RecordBatch>; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { let next_batch = self.index.value(); diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 948049f5a7..da871837cd 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -169,12 +169,16 @@ mod private { match r.payload { GetResultPayload::File(file, _) => { let reader = config.open(file)?; - Ok(futures::stream::iter(reader).boxed()) + Ok(futures::stream::iter(reader) + .map(|r| r.map_err(Into::into)) + .boxed()) } GetResultPayload::Stream(_) => { let bytes = r.bytes().await?; let reader = config.open(bytes.reader())?; - Ok(futures::stream::iter(reader).boxed()) + Ok(futures::stream::iter(reader) + .map(|r| r.map_err(Into::into)) + .boxed()) } } })) diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 6c994af940..8b95d9ba91 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -392,17 +392,20 @@ impl FileOpener for CsvOpener { )? }; - Ok(futures::stream::iter(config.open(decoder)?).boxed()) + Ok(futures::stream::iter(config.open(decoder)?) + .map(|r| r.map_err(Into::into)) + .boxed()) } GetResultPayload::Stream(s) => { let decoder = config.builder().build_decoder(); let s = s.map_err(DataFusionError::from); let input = file_compression_type.convert_stream(s.boxed())?.fuse(); - Ok(deserialize_stream( + let stream = deserialize_stream( input, DecoderDeserializer::new(CsvDecoder::new(decoder)), - )) + ); + Ok(stream.map_err(Into::into).boxed()) } } })) diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index d318928e5c..664f25525a 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -222,7 +222,9 @@ impl FileOpener for JsonOpener { .with_batch_size(batch_size) .build(BufReader::new(bytes))?; - Ok(futures::stream::iter(reader).boxed()) + Ok(futures::stream::iter(reader) + .map(|r| r.map_err(Into::into)) + .boxed()) } GetResultPayload::Stream(s) => { let s = s.map_err(DataFusionError::from); @@ -232,10 +234,11 @@ impl FileOpener for JsonOpener { .build_decoder()?; let input = file_compression_type.convert_stream(s.boxed())?.fuse(); - Ok(deserialize_stream( + let stream = deserialize_stream( input, DecoderDeserializer::new(JsonDecoder::new(decoder)), - )) + ); + Ok(stream.map_err(Into::into).boxed()) } } })) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index c078c2ef44..7ae3f83b77 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -32,7 +32,6 @@ use std::sync::Arc; use std::task::{Context, Poll}; use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit}; -use arrow::error::ArrowError; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::{exec_err, DataFusionError, Result}; @@ -414,8 +413,8 @@ impl FileOpener for ParquetOpener { .build()?; let stream = stream - .map_err(|e| ArrowError::ExternalError(Box::new(e))) - .map(move |b| b.and_then(|b| Ok(schema_mapping.map_batch(b)?))); + .map_err(DataFusionError::from) + .map(move |b| b.and_then(|b| schema_mapping.map_batch(b))); if let Some(file_pruner) = file_pruner { Ok(EarlyStoppingStream::new( @@ -462,12 +461,9 @@ impl<S> EarlyStoppingStream<S> { } impl<S> EarlyStoppingStream<S> where - S: Stream<Item = Result<RecordBatch, ArrowError>> + Unpin, + S: Stream<Item = Result<RecordBatch>> + Unpin, { - fn check_prune( - &mut self, - input: Result<RecordBatch, ArrowError>, - ) -> Result<Option<RecordBatch>, ArrowError> { + fn check_prune(&mut self, input: Result<RecordBatch>) -> Result<Option<RecordBatch>> { let batch = input?; // Since dynamic filters may have been updated, see if we can stop @@ -485,9 +481,9 @@ where impl<S> Stream for EarlyStoppingStream<S> where - S: Stream<Item = Result<RecordBatch, ArrowError>> + Unpin, + S: Stream<Item = Result<RecordBatch>> + Unpin, { - type Item = Result<RecordBatch, ArrowError>; + type Item = Result<RecordBatch>; fn poll_next( mut self: Pin<&mut Self>, @@ -697,8 +693,8 @@ mod test { use bytes::{BufMut, BytesMut}; use chrono::Utc; use datafusion_common::{ - assert_batches_eq, record_batch, stats::Precision, ColumnStatistics, ScalarValue, - Statistics, + assert_batches_eq, record_batch, stats::Precision, ColumnStatistics, + DataFusionError, ScalarValue, Statistics, }; use datafusion_datasource::{ file_meta::FileMeta, @@ -724,12 +720,8 @@ mod test { async fn count_batches_and_rows( mut stream: std::pin::Pin< Box< - dyn Stream< - Item = Result< - arrow::array::RecordBatch, - arrow::error::ArrowError, - >, - > + Send, + dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>> + + Send, >, >, ) -> (usize, usize) { @@ -745,12 +737,8 @@ mod test { async fn collect_batches( mut stream: std::pin::Pin< Box< - dyn Stream< - Item = Result< - arrow::array::RecordBatch, - arrow::error::ArrowError, - >, - > + Send, + dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>> + + Send, >, >, ) -> Vec<arrow::array::RecordBatch> { diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 868b980b64..54690ba496 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -37,7 +37,6 @@ use datafusion_physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time, }; -use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use datafusion_common::instant::Instant; use datafusion_common::ScalarValue; @@ -225,7 +224,6 @@ impl FileStream { let result = self .pc_projector .project(batch, partition_values) - .map_err(|e| ArrowError::ExternalError(e.into())) .map(|batch| match &mut self.remain { Some(remain) => { if *remain > batch.num_rows() { @@ -247,7 +245,7 @@ impl FileStream { self.state = FileStreamState::Error } self.file_stream_metrics.time_scanning_total.start(); - return Poll::Ready(Some(result.map_err(Into::into))); + return Poll::Ready(Some(result)); } Some(Err(err)) => { self.file_stream_metrics.file_scan_errors.add(1); @@ -281,7 +279,7 @@ impl FileStream { }, OnError::Fail => { self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(err.into()))); + return Poll::Ready(Some(Err(err))); } } } @@ -345,7 +343,7 @@ impl RecordBatchStream for FileStream { /// A fallible future that resolves to a stream of [`RecordBatch`] pub type FileOpenFuture = - BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>; + BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>; /// Describes the behavior of the `FileStream` if file opening or scanning fails pub enum OnError { @@ -376,7 +374,7 @@ pub trait FileOpener: Unpin + Send + Sync { /// is ready pub enum NextOpen { Pending(FileOpenFuture), - Ready(Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>), + Ready(Result<BoxStream<'static, Result<RecordBatch>>>), } pub enum FileStreamState { @@ -396,7 +394,7 @@ pub enum FileStreamState { /// Partitioning column values for the current batch_iter partition_values: Vec<ScalarValue>, /// The reader instance - reader: BoxStream<'static, Result<RecordBatch, ArrowError>>, + reader: BoxStream<'static, Result<RecordBatch>>, /// A [`FileOpenFuture`] for the next file to be processed, /// and its corresponding partition column values, if any. /// This allows the next file to be opened in parallel while the @@ -526,7 +524,6 @@ mod tests { use crate::file_scan_config::FileScanConfigBuilder; use crate::tests::make_partition; use crate::PartitionedFile; - use arrow::error::ArrowError; use datafusion_common::error::Result; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -540,7 +537,7 @@ mod tests { use arrow::array::RecordBatch; use arrow::datatypes::Schema; - use datafusion_common::{assert_batches_eq, internal_err}; + use datafusion_common::{assert_batches_eq, exec_err, internal_err}; /// Test `FileOpener` which will simulate errors during file opening or scanning #[derive(Default)] @@ -566,9 +563,7 @@ mod tests { if self.error_opening_idx.contains(&idx) { Ok(futures::future::ready(internal_err!("error opening")).boxed()) } else if self.error_scanning_idx.contains(&idx) { - let error = futures::future::ready(Err(ArrowError::IpcError( - "error scanning".to_owned(), - ))); + let error = futures::future::ready(exec_err!("error scanning")); let stream = futures::stream::once(error).boxed(); Ok(futures::future::ready(Ok(stream)).boxed()) } else { diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 0d4bc9c960..79be97b0d8 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -265,6 +265,26 @@ Reimplementation for any custom `DataSource` should be relatively straightforwar [#17395]: https://github.com/apache/datafusion/pull/17395/ +### `FileOpenFuture` now uses `DataFusionError` instead of `ArrowError` + +The `FileOpenFuture` type alias has been updated to use `DataFusionError` instead of `ArrowError` for its error type. This change affects the `FileOpener` trait and any implementations that work with file streaming operations. + +**Before:** + +```rust,ignore +pub type FileOpenFuture = BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>; +``` + +**After:** + +```rust,ignore +pub type FileOpenFuture = BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>; +``` + +If you have custom implementations of `FileOpener` or work directly with `FileOpenFuture`, you'll need to update your error handling to use `DataFusionError` instead of `ArrowError`. The `FileStreamState` enum's `Open` variant has also been updated accordingly. See [#17397] for more details. + +[#17397]: https://github.com/apache/datafusion/pull/17397 + ## DataFusion `49.0.0` ### `MSRV` updated to 1.85.1 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org