This is an automated email from the ASF dual-hosted git repository.

adriangb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d5863b059 Use DataFusionError instead of ArrowError in FileOpenFuture 
(#17397)
3d5863b059 is described below

commit 3d5863b05950da382df7e070f457d8166618d270
Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com>
AuthorDate: Thu Sep 4 13:23:09 2025 -0500

    Use DataFusionError instead of ArrowError in FileOpenFuture (#17397)
---
 .../src/datasource/physical_plan/arrow_file.rs     | 15 +++++----
 .../physical_optimizer/filter_pushdown/util.rs     | 12 ++------
 datafusion/datasource-avro/src/source.rs           |  8 +++--
 datafusion/datasource-csv/src/source.rs            |  9 ++++--
 datafusion/datasource-json/src/source.rs           |  9 ++++--
 datafusion/datasource-parquet/src/opener.rs        | 36 ++++++++--------------
 datafusion/datasource/src/file_stream.rs           | 19 +++++-------
 docs/source/library-user-guide/upgrading.md        | 20 ++++++++++++
 8 files changed, 69 insertions(+), 59 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs 
b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index d0af96329b..68334a4a18 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -26,7 +26,7 @@ use 
datafusion_datasource::schema_adapter::SchemaAdapterFactory;
 use arrow::buffer::Buffer;
 use arrow::datatypes::SchemaRef;
 use arrow_ipc::reader::FileDecoder;
-use datafusion_common::Statistics;
+use datafusion_common::{exec_datafusion_err, Statistics};
 use datafusion_datasource::file::FileSource;
 use datafusion_datasource::file_scan_config::FileScanConfig;
 use datafusion_datasource::PartitionedFile;
@@ -140,7 +140,9 @@ impl FileOpener for ArrowOpener {
                             let arrow_reader = 
arrow::ipc::reader::FileReader::try_new(
                                 file, projection,
                             )?;
-                            Ok(futures::stream::iter(arrow_reader).boxed())
+                            Ok(futures::stream::iter(arrow_reader)
+                                .map(|r| r.map_err(Into::into))
+                                .boxed())
                         }
                         GetResultPayload::Stream(_) => {
                             let bytes = r.bytes().await?;
@@ -148,7 +150,9 @@ impl FileOpener for ArrowOpener {
                             let arrow_reader = 
arrow::ipc::reader::FileReader::try_new(
                                 cursor, projection,
                             )?;
-                            Ok(futures::stream::iter(arrow_reader).boxed())
+                            Ok(futures::stream::iter(arrow_reader)
+                                .map(|r| r.map_err(Into::into))
+                                .boxed())
                         }
                     }
                 }
@@ -179,9 +183,7 @@ impl FileOpener for ArrowOpener {
                         footer_buf[..footer_len].try_into().unwrap(),
                     )
                     .map_err(|err| {
-                        arrow::error::ArrowError::ParseError(format!(
-                            "Unable to get root as footer: {err:?}"
-                        ))
+                        exec_datafusion_err!("Unable to get root as footer: 
{err:?}")
                     })?;
                     // build decoder according to footer & projection
                     let schema =
@@ -248,6 +250,7 @@ impl FileOpener for ArrowOpener {
                                     .transpose()
                             }),
                     )
+                    .map(|r| r.map_err(Into::into))
                     .boxed())
                 }
             }
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
index 64cee011cc..7d0020b2e9 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use arrow::datatypes::SchemaRef;
-use arrow::error::ArrowError;
 use arrow::{array::RecordBatch, compute::concat_batches};
 use datafusion::{datasource::object_store::ObjectStoreUrl, 
physical_plan::PhysicalExpr};
 use datafusion_common::{config::ConfigOptions, internal_err, Result, 
Statistics};
@@ -39,7 +38,7 @@ use datafusion_physical_plan::{
     metrics::ExecutionPlanMetricsSet,
     DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
 };
-use futures::stream::BoxStream;
+use futures::StreamExt;
 use futures::{FutureExt, Stream};
 use object_store::ObjectStore;
 use std::{
@@ -93,12 +92,7 @@ impl FileOpener for TestOpener {
 
         let stream = TestStream::new(batches);
 
-        Ok((async {
-            let stream: BoxStream<'static, Result<RecordBatch, ArrowError>> =
-                Box::pin(stream);
-            Ok(stream)
-        })
-        .boxed())
+        Ok((async { Ok(stream.boxed()) }).boxed())
     }
 }
 
@@ -344,7 +338,7 @@ impl TestStream {
 }
 
 impl Stream for TestStream {
-    type Item = Result<RecordBatch, ArrowError>;
+    type Item = Result<RecordBatch>;
 
     fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
         let next_batch = self.index.value();
diff --git a/datafusion/datasource-avro/src/source.rs 
b/datafusion/datasource-avro/src/source.rs
index 948049f5a7..da871837cd 100644
--- a/datafusion/datasource-avro/src/source.rs
+++ b/datafusion/datasource-avro/src/source.rs
@@ -169,12 +169,16 @@ mod private {
                 match r.payload {
                     GetResultPayload::File(file, _) => {
                         let reader = config.open(file)?;
-                        Ok(futures::stream::iter(reader).boxed())
+                        Ok(futures::stream::iter(reader)
+                            .map(|r| r.map_err(Into::into))
+                            .boxed())
                     }
                     GetResultPayload::Stream(_) => {
                         let bytes = r.bytes().await?;
                         let reader = config.open(bytes.reader())?;
-                        Ok(futures::stream::iter(reader).boxed())
+                        Ok(futures::stream::iter(reader)
+                            .map(|r| r.map_err(Into::into))
+                            .boxed())
                     }
                 }
             }))
diff --git a/datafusion/datasource-csv/src/source.rs 
b/datafusion/datasource-csv/src/source.rs
index 6c994af940..8b95d9ba91 100644
--- a/datafusion/datasource-csv/src/source.rs
+++ b/datafusion/datasource-csv/src/source.rs
@@ -392,17 +392,20 @@ impl FileOpener for CsvOpener {
                         )?
                     };
 
-                    Ok(futures::stream::iter(config.open(decoder)?).boxed())
+                    Ok(futures::stream::iter(config.open(decoder)?)
+                        .map(|r| r.map_err(Into::into))
+                        .boxed())
                 }
                 GetResultPayload::Stream(s) => {
                     let decoder = config.builder().build_decoder();
                     let s = s.map_err(DataFusionError::from);
                     let input = 
file_compression_type.convert_stream(s.boxed())?.fuse();
 
-                    Ok(deserialize_stream(
+                    let stream = deserialize_stream(
                         input,
                         DecoderDeserializer::new(CsvDecoder::new(decoder)),
-                    ))
+                    );
+                    Ok(stream.map_err(Into::into).boxed())
                 }
             }
         }))
diff --git a/datafusion/datasource-json/src/source.rs 
b/datafusion/datasource-json/src/source.rs
index d318928e5c..664f25525a 100644
--- a/datafusion/datasource-json/src/source.rs
+++ b/datafusion/datasource-json/src/source.rs
@@ -222,7 +222,9 @@ impl FileOpener for JsonOpener {
                         .with_batch_size(batch_size)
                         .build(BufReader::new(bytes))?;
 
-                    Ok(futures::stream::iter(reader).boxed())
+                    Ok(futures::stream::iter(reader)
+                        .map(|r| r.map_err(Into::into))
+                        .boxed())
                 }
                 GetResultPayload::Stream(s) => {
                     let s = s.map_err(DataFusionError::from);
@@ -232,10 +234,11 @@ impl FileOpener for JsonOpener {
                         .build_decoder()?;
                     let input = 
file_compression_type.convert_stream(s.boxed())?.fuse();
 
-                    Ok(deserialize_stream(
+                    let stream = deserialize_stream(
                         input,
                         DecoderDeserializer::new(JsonDecoder::new(decoder)),
-                    ))
+                    );
+                    Ok(stream.map_err(Into::into).boxed())
                 }
             }
         }))
diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index c078c2ef44..7ae3f83b77 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -32,7 +32,6 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit};
-use arrow::error::ArrowError;
 use datafusion_common::encryption::FileDecryptionProperties;
 
 use datafusion_common::{exec_err, DataFusionError, Result};
@@ -414,8 +413,8 @@ impl FileOpener for ParquetOpener {
                 .build()?;
 
             let stream = stream
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))
-                .map(move |b| b.and_then(|b| 
Ok(schema_mapping.map_batch(b)?)));
+                .map_err(DataFusionError::from)
+                .map(move |b| b.and_then(|b| schema_mapping.map_batch(b)));
 
             if let Some(file_pruner) = file_pruner {
                 Ok(EarlyStoppingStream::new(
@@ -462,12 +461,9 @@ impl<S> EarlyStoppingStream<S> {
 }
 impl<S> EarlyStoppingStream<S>
 where
-    S: Stream<Item = Result<RecordBatch, ArrowError>> + Unpin,
+    S: Stream<Item = Result<RecordBatch>> + Unpin,
 {
-    fn check_prune(
-        &mut self,
-        input: Result<RecordBatch, ArrowError>,
-    ) -> Result<Option<RecordBatch>, ArrowError> {
+    fn check_prune(&mut self, input: Result<RecordBatch>) -> 
Result<Option<RecordBatch>> {
         let batch = input?;
 
         // Since dynamic filters may have been updated, see if we can stop
@@ -485,9 +481,9 @@ where
 
 impl<S> Stream for EarlyStoppingStream<S>
 where
-    S: Stream<Item = Result<RecordBatch, ArrowError>> + Unpin,
+    S: Stream<Item = Result<RecordBatch>> + Unpin,
 {
-    type Item = Result<RecordBatch, ArrowError>;
+    type Item = Result<RecordBatch>;
 
     fn poll_next(
         mut self: Pin<&mut Self>,
@@ -697,8 +693,8 @@ mod test {
     use bytes::{BufMut, BytesMut};
     use chrono::Utc;
     use datafusion_common::{
-        assert_batches_eq, record_batch, stats::Precision, ColumnStatistics, 
ScalarValue,
-        Statistics,
+        assert_batches_eq, record_batch, stats::Precision, ColumnStatistics,
+        DataFusionError, ScalarValue, Statistics,
     };
     use datafusion_datasource::{
         file_meta::FileMeta,
@@ -724,12 +720,8 @@ mod test {
     async fn count_batches_and_rows(
         mut stream: std::pin::Pin<
             Box<
-                dyn Stream<
-                        Item = Result<
-                            arrow::array::RecordBatch,
-                            arrow::error::ArrowError,
-                        >,
-                    > + Send,
+                dyn Stream<Item = Result<arrow::array::RecordBatch, 
DataFusionError>>
+                    + Send,
             >,
         >,
     ) -> (usize, usize) {
@@ -745,12 +737,8 @@ mod test {
     async fn collect_batches(
         mut stream: std::pin::Pin<
             Box<
-                dyn Stream<
-                        Item = Result<
-                            arrow::array::RecordBatch,
-                            arrow::error::ArrowError,
-                        >,
-                    > + Send,
+                dyn Stream<Item = Result<arrow::array::RecordBatch, 
DataFusionError>>
+                    + Send,
             >,
         >,
     ) -> Vec<arrow::array::RecordBatch> {
diff --git a/datafusion/datasource/src/file_stream.rs 
b/datafusion/datasource/src/file_stream.rs
index 868b980b64..54690ba496 100644
--- a/datafusion/datasource/src/file_stream.rs
+++ b/datafusion/datasource/src/file_stream.rs
@@ -37,7 +37,6 @@ use datafusion_physical_plan::metrics::{
     BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
 };
 
-use arrow::error::ArrowError;
 use arrow::record_batch::RecordBatch;
 use datafusion_common::instant::Instant;
 use datafusion_common::ScalarValue;
@@ -225,7 +224,6 @@ impl FileStream {
                             let result = self
                                 .pc_projector
                                 .project(batch, partition_values)
-                                .map_err(|e| 
ArrowError::ExternalError(e.into()))
                                 .map(|batch| match &mut self.remain {
                                     Some(remain) => {
                                         if *remain > batch.num_rows() {
@@ -247,7 +245,7 @@ impl FileStream {
                                 self.state = FileStreamState::Error
                             }
                             
self.file_stream_metrics.time_scanning_total.start();
-                            return 
Poll::Ready(Some(result.map_err(Into::into)));
+                            return Poll::Ready(Some(result));
                         }
                         Some(Err(err)) => {
                             self.file_stream_metrics.file_scan_errors.add(1);
@@ -281,7 +279,7 @@ impl FileStream {
                                 },
                                 OnError::Fail => {
                                     self.state = FileStreamState::Error;
-                                    return Poll::Ready(Some(Err(err.into())));
+                                    return Poll::Ready(Some(Err(err)));
                                 }
                             }
                         }
@@ -345,7 +343,7 @@ impl RecordBatchStream for FileStream {
 
 /// A fallible future that resolves to a stream of [`RecordBatch`]
 pub type FileOpenFuture =
-    BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, 
ArrowError>>>>;
+    BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>;
 
 /// Describes the behavior of the `FileStream` if file opening or scanning 
fails
 pub enum OnError {
@@ -376,7 +374,7 @@ pub trait FileOpener: Unpin + Send + Sync {
 /// is ready
 pub enum NextOpen {
     Pending(FileOpenFuture),
-    Ready(Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>),
+    Ready(Result<BoxStream<'static, Result<RecordBatch>>>),
 }
 
 pub enum FileStreamState {
@@ -396,7 +394,7 @@ pub enum FileStreamState {
         /// Partitioning column values for the current batch_iter
         partition_values: Vec<ScalarValue>,
         /// The reader instance
-        reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
+        reader: BoxStream<'static, Result<RecordBatch>>,
         /// A [`FileOpenFuture`] for the next file to be processed,
         /// and its corresponding partition column values, if any.
         /// This allows the next file to be opened in parallel while the
@@ -526,7 +524,6 @@ mod tests {
     use crate::file_scan_config::FileScanConfigBuilder;
     use crate::tests::make_partition;
     use crate::PartitionedFile;
-    use arrow::error::ArrowError;
     use datafusion_common::error::Result;
     use datafusion_execution::object_store::ObjectStoreUrl;
     use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -540,7 +537,7 @@ mod tests {
     use arrow::array::RecordBatch;
     use arrow::datatypes::Schema;
 
-    use datafusion_common::{assert_batches_eq, internal_err};
+    use datafusion_common::{assert_batches_eq, exec_err, internal_err};
 
     /// Test `FileOpener` which will simulate errors during file opening or 
scanning
     #[derive(Default)]
@@ -566,9 +563,7 @@ mod tests {
             if self.error_opening_idx.contains(&idx) {
                 Ok(futures::future::ready(internal_err!("error 
opening")).boxed())
             } else if self.error_scanning_idx.contains(&idx) {
-                let error = futures::future::ready(Err(ArrowError::IpcError(
-                    "error scanning".to_owned(),
-                )));
+                let error = futures::future::ready(exec_err!("error 
scanning"));
                 let stream = futures::stream::once(error).boxed();
                 Ok(futures::future::ready(Ok(stream)).boxed())
             } else {
diff --git a/docs/source/library-user-guide/upgrading.md 
b/docs/source/library-user-guide/upgrading.md
index 0d4bc9c960..79be97b0d8 100644
--- a/docs/source/library-user-guide/upgrading.md
+++ b/docs/source/library-user-guide/upgrading.md
@@ -265,6 +265,26 @@ Reimplementation for any custom `DataSource` should be 
relatively straightforwar
 
 [#17395]: https://github.com/apache/datafusion/pull/17395/
 
+### `FileOpenFuture` now uses `DataFusionError` instead of `ArrowError`
+
+The `FileOpenFuture` type alias has been updated to use `DataFusionError` 
instead of `ArrowError` for its error type. This change affects the 
`FileOpener` trait and any implementations that work with file streaming 
operations.
+
+**Before:**
+
+```rust,ignore
+pub type FileOpenFuture = BoxFuture<'static, Result<BoxStream<'static, 
Result<RecordBatch, ArrowError>>>>;
+```
+
+**After:**
+
+```rust,ignore
+pub type FileOpenFuture = BoxFuture<'static, Result<BoxStream<'static, 
Result<RecordBatch>>>>;
+```
+
+If you have custom implementations of `FileOpener` or work directly with 
`FileOpenFuture`, you'll need to update your error handling to use 
`DataFusionError` instead of `ArrowError`. The `FileStreamState` enum's `Open` 
variant has also been updated accordingly. See [#17397] for more details.
+
+[#17397]: https://github.com/apache/datafusion/pull/17397
+
 ## DataFusion `49.0.0`
 
 ### `MSRV` updated to 1.85.1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to