thinkharderdev commented on code in PR #6491:
URL: https://github.com/apache/arrow-datafusion/pull/6491#discussion_r1210787534


##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -484,6 +569,256 @@ mod tests {
             .await
     }
 
+    /// Test `FileOpener` which will simulate errors during file opening or 
scanning
+    struct TestOpenerWithErrors {
+        /// Index in stream of files which should throw an error while opening
+        error_opening_idx: Vec<usize>,
+        /// Index in stream of files which should throw an error while scanning
+        error_scanning_idx: Vec<usize>,
+        /// Index of last file in stream
+        current_idx: AtomicUsize,
+        /// `RecordBatch` to return
+        records: Vec<RecordBatch>,
+    }
+
+    impl FileOpener for TestOpenerWithErrors {
+        fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
+            let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
+
+            if self.error_opening_idx.contains(&idx) {
+                Ok(futures::future::ready(Err(DataFusionError::Internal(
+                    "error opening".to_owned(),
+                )))
+                .boxed())
+            } else if self.error_scanning_idx.contains(&idx) {
+                let error = futures::future::ready(Err(ArrowError::IoError(
+                    "error scanning".to_owned(),
+                )));
+                let stream = futures::stream::once(error).boxed();
+                Ok(futures::future::ready(Ok(stream)).boxed())
+            } else {
+                let iterator = self.records.clone().into_iter().map(Ok);
+                let stream = futures::stream::iter(iterator).boxed();
+                Ok(futures::future::ready(Ok(stream)).boxed())
+            }
+        }
+    }
+
+    /// helper that creates a stream of files and simulates errors during 
opening and/or scanning
+    async fn create_and_collect_with_errors(
+        on_error: OnError,
+        num_files: usize,
+        scan_errors: Vec<usize>,
+        open_errors: Vec<usize>,
+        limit: Option<usize>,
+    ) -> Result<Vec<RecordBatch>> {
+        let records = vec![make_partition(3), make_partition(2)];
+        let file_schema = records[0].schema();
+
+        let reader = TestOpenerWithErrors {
+            error_opening_idx: open_errors,
+            error_scanning_idx: scan_errors,
+            current_idx: Default::default(),
+            records,
+        };
+
+        let ctx = SessionContext::new();
+        let mock_files: Vec<(String, u64)> = (0..num_files)
+            .map(|idx| (format!("mock_file{idx}"), 10_u64))
+            .collect();
+
+        let mock_files_ref: Vec<(&str, u64)> = mock_files
+            .iter()
+            .map(|(name, size)| (name.as_str(), *size))
+            .collect();
+
+        register_test_store(&ctx, &mock_files_ref);
+
+        let file_group = mock_files
+            .into_iter()
+            .map(|(name, size)| PartitionedFile::new(name, size))
+            .collect();
+
+        let config = FileScanConfig {
+            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+            file_schema,
+            file_groups: vec![file_group],
+            statistics: Default::default(),
+            projection: None,
+            limit,
+            table_partition_cols: vec![],
+            output_ordering: None,
+            infinite_source: false,
+        };
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let file_stream = FileStream::new(&config, 0, reader, &metrics_set)
+            .unwrap()
+            .with_on_error(on_error);
+
+        file_stream
+            .collect::<Vec<_>>()
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>>>()
+    }
+
+    #[tokio::test]
+    async fn on_error_opening() -> Result<()> {
+        let batches =
+            create_and_collect_with_errors(OnError::Skip, 2, vec![], vec![0], 
None)
+                .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches =
+            create_and_collect_with_errors(OnError::Skip, 2, vec![], vec![1], 
None)
+                .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches =
+            create_and_collect_with_errors(OnError::Skip, 2, vec![], vec![0, 
1], None)
+                .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "++",
+            "++",
+        ], &batches);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn on_error_scanning() -> Result<()> {
+        let batches =
+            create_and_collect_with_errors(OnError::Skip, 2, vec![0], vec![], 
None)
+                .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches =
+            create_and_collect_with_errors(OnError::Skip, 2, vec![1], vec![], 
None)
+                .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches =
+            create_and_collect_with_errors(OnError::Skip, 2, vec![0, 1], 
vec![], None)

Review Comment:
   I think it's probably effectively covered by verifying the output, but added 
explicit test cases for `OnError::Fail` just to cover our bases. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to