alamb commented on code in PR #6491:
URL: https://github.com/apache/arrow-datafusion/pull/6491#discussion_r1210629979


##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -41,14 +41,22 @@ use crate::physical_plan::file_format::{
     FileMeta, FileScanConfig, PartitionColumnProjector,
 };
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, Time,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
 };
 use crate::physical_plan::RecordBatchStream;
 
 /// A fallible future that resolves to a stream of [`RecordBatch`]
 pub type FileOpenFuture =
     BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, 
ArrowError>>>>;
 
+/// Describes the behavior of the `FileStream` if file opening or scanning 
fails
+pub enum OnError {
+    /// Continue scanning, ignoring the failed file

Review Comment:
   these comments appear to be reversed (the are on the opposite variants)



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -239,9 +267,19 @@ impl<F: FileOpener> FileStream<F> {
             state: FileStreamState::Idle,
             file_stream_metrics: FileStreamMetrics::new(metrics, partition),
             baseline_metrics: BaselineMetrics::new(metrics, partition),
+            on_error: OnError::Fail,
         })
     }
 
+    /// Specify the behavior when an error occurs opening or scanning a file
+    ///
+    /// If `OnError::Skip` the stream will skip files which encounter and 
error and continue

Review Comment:
   ```suggestion
       /// If `OnError::Skip` the stream will skip files which encounter an 
error and continue
   ```



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -484,6 +569,256 @@ mod tests {
             .await
     }
 
+    /// Test `FileOpener` which will simulate errors during file opening or 
scanning
+    struct TestOpenerWithErrors {
+        /// Index in stream of files which should throw an error while opening
+        error_opening_idx: Vec<usize>,
+        /// Index in stream of files which should throw an error while scanning
+        error_scanning_idx: Vec<usize>,
+        /// Index of last file in stream
+        current_idx: AtomicUsize,
+        /// `RecordBatch` to return
+        records: Vec<RecordBatch>,
+    }
+
+    impl FileOpener for TestOpenerWithErrors {

Review Comment:
   this is very clever 👍 



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -484,6 +569,256 @@ mod tests {
             .await
     }
 
+    /// Test `FileOpener` which will simulate errors during file opening or 
scanning
+    struct TestOpenerWithErrors {
+        /// Index in stream of files which should throw an error while opening
+        error_opening_idx: Vec<usize>,
+        /// Index in stream of files which should throw an error while scanning
+        error_scanning_idx: Vec<usize>,
+        /// Index of last file in stream
+        current_idx: AtomicUsize,
+        /// `RecordBatch` to return
+        records: Vec<RecordBatch>,
+    }
+
+    impl FileOpener for TestOpenerWithErrors {
+        fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
+            let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
+
+            if self.error_opening_idx.contains(&idx) {
+                Ok(futures::future::ready(Err(DataFusionError::Internal(
+                    "error opening".to_owned(),
+                )))
+                .boxed())
+            } else if self.error_scanning_idx.contains(&idx) {
+                let error = futures::future::ready(Err(ArrowError::IoError(
+                    "error scanning".to_owned(),
+                )));
+                let stream = futures::stream::once(error).boxed();
+                Ok(futures::future::ready(Ok(stream)).boxed())
+            } else {
+                let iterator = self.records.clone().into_iter().map(Ok);
+                let stream = futures::stream::iter(iterator).boxed();
+                Ok(futures::future::ready(Ok(stream)).boxed())
+            }
+        }
+    }
+
+    /// helper that creates a stream of files and simulates errors during 
opening and/or scanning
+    async fn create_and_collect_with_errors(
+        on_error: OnError,
+        num_files: usize,
+        scan_errors: Vec<usize>,
+        open_errors: Vec<usize>,
+        limit: Option<usize>,
+    ) -> Result<Vec<RecordBatch>> {
+        let records = vec![make_partition(3), make_partition(2)];
+        let file_schema = records[0].schema();
+
+        let reader = TestOpenerWithErrors {
+            error_opening_idx: open_errors,
+            error_scanning_idx: scan_errors,
+            current_idx: Default::default(),
+            records,
+        };
+
+        let ctx = SessionContext::new();
+        let mock_files: Vec<(String, u64)> = (0..num_files)
+            .map(|idx| (format!("mock_file{idx}"), 10_u64))
+            .collect();
+
+        let mock_files_ref: Vec<(&str, u64)> = mock_files
+            .iter()
+            .map(|(name, size)| (name.as_str(), *size))
+            .collect();
+
+        register_test_store(&ctx, &mock_files_ref);
+
+        let file_group = mock_files
+            .into_iter()
+            .map(|(name, size)| PartitionedFile::new(name, size))
+            .collect();
+
+        let config = FileScanConfig {
+            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+            file_schema,
+            file_groups: vec![file_group],
+            statistics: Default::default(),
+            projection: None,
+            limit,
+            table_partition_cols: vec![],
+            output_ordering: None,
+            infinite_source: false,
+        };
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let file_stream = FileStream::new(&config, 0, reader, &metrics_set)
+            .unwrap()
+            .with_on_error(on_error);
+
+        file_stream
+            .collect::<Vec<_>>()
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>>>()
+    }
+
+    #[tokio::test]
+    async fn on_error_opening() -> Result<()> {
+        let batches =
+            create_and_collect_with_errors(OnError::Skip, 2, vec![], vec![0], 
None)
+                .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches =
+            create_and_collect_with_errors(OnError::Skip, 2, vec![], vec![1], 
None)
+                .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches =
+            create_and_collect_with_errors(OnError::Skip, 2, vec![], vec![0, 
1], None)
+                .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "++",
+            "++",
+        ], &batches);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn on_error_scanning() -> Result<()> {
+        let batches =
+            create_and_collect_with_errors(OnError::Skip, 2, vec![0], vec![], 
None)
+                .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches =
+            create_and_collect_with_errors(OnError::Skip, 2, vec![1], vec![], 
None)
+                .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches =
+            create_and_collect_with_errors(OnError::Skip, 2, vec![0, 1], 
vec![], None)

Review Comment:
   I wonder if it is worth checking in these tests that the actual error is as 
generated (to verify the test is testing the correct thing and that the 
plumbing to get the errors out is working)?



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -484,6 +569,256 @@ mod tests {
             .await
     }
 
+    /// Test `FileOpener` which will simulate errors during file opening or 
scanning
+    struct TestOpenerWithErrors {
+        /// Index in stream of files which should throw an error while opening
+        error_opening_idx: Vec<usize>,
+        /// Index in stream of files which should throw an error while scanning
+        error_scanning_idx: Vec<usize>,
+        /// Index of last file in stream
+        current_idx: AtomicUsize,
+        /// `RecordBatch` to return
+        records: Vec<RecordBatch>,
+    }
+
+    impl FileOpener for TestOpenerWithErrors {
+        fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
+            let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
+
+            if self.error_opening_idx.contains(&idx) {
+                Ok(futures::future::ready(Err(DataFusionError::Internal(
+                    "error opening".to_owned(),
+                )))
+                .boxed())
+            } else if self.error_scanning_idx.contains(&idx) {
+                let error = futures::future::ready(Err(ArrowError::IoError(
+                    "error scanning".to_owned(),
+                )));
+                let stream = futures::stream::once(error).boxed();
+                Ok(futures::future::ready(Ok(stream)).boxed())
+            } else {
+                let iterator = self.records.clone().into_iter().map(Ok);
+                let stream = futures::stream::iter(iterator).boxed();
+                Ok(futures::future::ready(Ok(stream)).boxed())
+            }
+        }
+    }
+
+    /// helper that creates a stream of files and simulates errors during 
opening and/or scanning
+    async fn create_and_collect_with_errors(
+        on_error: OnError,
+        num_files: usize,
+        scan_errors: Vec<usize>,
+        open_errors: Vec<usize>,
+        limit: Option<usize>,
+    ) -> Result<Vec<RecordBatch>> {
+        let records = vec![make_partition(3), make_partition(2)];
+        let file_schema = records[0].schema();
+
+        let reader = TestOpenerWithErrors {
+            error_opening_idx: open_errors,
+            error_scanning_idx: scan_errors,
+            current_idx: Default::default(),
+            records,
+        };
+
+        let ctx = SessionContext::new();
+        let mock_files: Vec<(String, u64)> = (0..num_files)
+            .map(|idx| (format!("mock_file{idx}"), 10_u64))
+            .collect();
+
+        let mock_files_ref: Vec<(&str, u64)> = mock_files
+            .iter()
+            .map(|(name, size)| (name.as_str(), *size))
+            .collect();
+
+        register_test_store(&ctx, &mock_files_ref);
+
+        let file_group = mock_files
+            .into_iter()
+            .map(|(name, size)| PartitionedFile::new(name, size))
+            .collect();
+
+        let config = FileScanConfig {
+            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+            file_schema,
+            file_groups: vec![file_group],
+            statistics: Default::default(),
+            projection: None,
+            limit,
+            table_partition_cols: vec![],
+            output_ordering: None,
+            infinite_source: false,
+        };
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let file_stream = FileStream::new(&config, 0, reader, &metrics_set)
+            .unwrap()
+            .with_on_error(on_error);
+
+        file_stream
+            .collect::<Vec<_>>()
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>>>()
+    }
+
+    #[tokio::test]
+    async fn on_error_opening() -> Result<()> {
+        let batches =
+            create_and_collect_with_errors(OnError::Skip, 2, vec![], vec![0], 
None)
+                .await?;
+

Review Comment:
   A builder style for these tests might make the intent more clear and self 
documenting
   
   Perhaps something like:
   
   ```rust
               let batches = TestOpenerWithErrors::new(OnError::Skip),
                 .with_num_files(2)
                 .with_open_errors(vec![0])
                 .build()
                 .await?;
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to