This is an automated email from the ASF dual-hosted git repository.

houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fa7e64  [test] added test object store (#1124)
4fa7e64 is described below

commit 4fa7e6465eb61d5adcc054adef78cf6da99847bd
Author: rdettai <[email protected]>
AuthorDate: Sun Oct 17 05:22:41 2021 +0200

    [test] added test object store (#1124)
---
 datafusion/src/datasource/listing.rs             | 149 +++++++++++------------
 datafusion/src/physical_optimizer/repartition.rs |   3 +-
 datafusion/src/test/mod.rs                       |   1 +
 datafusion/src/test/object_store.rs              | 122 +++++++++++++++++++
 4 files changed, 198 insertions(+), 77 deletions(-)

diff --git a/datafusion/src/datasource/listing.rs 
b/datafusion/src/datasource/listing.rs
index 585a40f..4af82d0 100644
--- a/datafusion/src/datasource/listing.rs
+++ b/datafusion/src/datasource/listing.rs
@@ -283,16 +283,12 @@ fn split_files(
 
 #[cfg(test)]
 mod tests {
-    use std::io::Read;
-
-    use futures::AsyncRead;
-
-    use crate::datasource::{
-        file_format::{avro::AvroFormat, parquet::ParquetFormat},
-        object_store::{
-            local::LocalFileSystem, FileMeta, FileMetaStream, ListEntryStream,
-            ObjectReader, ObjectStore, SizedFile,
+    use crate::{
+        datasource::{
+            file_format::{avro::AvroFormat, parquet::ParquetFormat},
+            object_store::{local::LocalFileSystem, FileMeta, ObjectStore, 
SizedFile},
         },
+        test::object_store::TestObjectStore,
     };
 
     use super::*;
@@ -363,10 +359,67 @@ mod tests {
 
     #[tokio::test]
     async fn file_listings() -> Result<()> {
-        assert_partitioning(5, 12, 5).await?;
-        assert_partitioning(4, 4, 4).await?;
-        assert_partitioning(5, 2, 2).await?;
-        assert_partitioning(0, 2, 0).await.expect_err("no files");
+        // more expected partitions than files
+        assert_partitioning(
+            &[
+                "bucket/key-prefix/file0",
+                "bucket/key-prefix/file1",
+                "bucket/key-prefix/file2",
+                "bucket/key-prefix/file3",
+                "bucket/key-prefix/file4",
+            ],
+            "bucket/key-prefix/",
+            12,
+            5,
+        )
+        .await?;
+
+        // as many expected partitions as files
+        assert_partitioning(
+            &[
+                "bucket/key-prefix/file0",
+                "bucket/key-prefix/file1",
+                "bucket/key-prefix/file2",
+                "bucket/key-prefix/file3",
+            ],
+            "bucket/key-prefix/",
+            4,
+            4,
+        )
+        .await?;
+
+        // more files as expected partitions
+        assert_partitioning(
+            &[
+                "bucket/key-prefix/file0",
+                "bucket/key-prefix/file1",
+                "bucket/key-prefix/file2",
+                "bucket/key-prefix/file3",
+                "bucket/key-prefix/file4",
+            ],
+            "bucket/key-prefix/",
+            2,
+            2,
+        )
+        .await?;
+
+        // no files
+        assert_partitioning(&[], "bucket/key-prefix/", 2, 0)
+            .await
+            .expect_err("no files");
+
+        // files that don't match the prefix
+        assert_partitioning(
+            &[
+                "bucket/key-prefix/file0",
+                "bucket/key-prefix/file1",
+                "bucket/other-prefix/roguefile",
+            ],
+            "bucket/key-prefix/",
+            10,
+            2,
+        )
+        .await?;
         Ok(())
     }
 
@@ -390,13 +443,16 @@ mod tests {
         Ok(Arc::new(table))
     }
 
+    /// Check that the files listed by the table match the specified 
`output_partitioning`
+    /// when the object store contains `files`.
     async fn assert_partitioning(
-        files_in_folder: usize,
+        files: &[&str],
+        table_prefix: &str,
         target_partitions: usize,
         output_partitioning: usize,
     ) -> Result<()> {
         let mock_store: Arc<dyn ObjectStore> =
-            Arc::new(MockObjectStore { files_in_folder });
+            TestObjectStore::new_arc(&files.iter().map(|f| (*f, 
10)).collect::<Vec<_>>());
 
         let format = AvroFormat {};
 
@@ -412,76 +468,17 @@ mod tests {
 
         let table = ListingTable::new(
             Arc::clone(&mock_store),
-            "bucket/key-prefix".to_owned(),
+            table_prefix.to_owned(),
             Arc::new(schema),
             opt,
         );
 
         let (file_list, _) = table
-            .list_files_for_scan(mock_store, "bucket/key-prefix", &[], None)
+            .list_files_for_scan(mock_store, table_prefix, &[], None)
             .await?;
 
         assert_eq!(file_list.len(), output_partitioning);
 
         Ok(())
     }
-
-    #[derive(Debug)]
-    struct MockObjectStore {
-        pub files_in_folder: usize,
-    }
-
-    #[async_trait]
-    impl ObjectStore for MockObjectStore {
-        async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
-            let prefix = prefix.to_owned();
-            let files = (0..self.files_in_folder).map(move |i| {
-                Ok(FileMeta {
-                    sized_file: SizedFile {
-                        path: format!("{}file{}", prefix, i),
-                        size: 100,
-                    },
-                    last_modified: None,
-                })
-            });
-            Ok(Box::pin(futures::stream::iter(files)))
-        }
-
-        async fn list_dir(
-            &self,
-            _prefix: &str,
-            _delimiter: Option<String>,
-        ) -> Result<ListEntryStream> {
-            unimplemented!()
-        }
-
-        fn file_reader(&self, _file: SizedFile) -> Result<Arc<dyn 
ObjectReader>> {
-            Ok(Arc::new(MockObjectReader {}))
-        }
-    }
-
-    struct MockObjectReader {}
-
-    #[async_trait]
-    impl ObjectReader for MockObjectReader {
-        async fn chunk_reader(
-            &self,
-            _start: u64,
-            _length: usize,
-        ) -> Result<Box<dyn AsyncRead>> {
-            unimplemented!()
-        }
-
-        fn sync_chunk_reader(
-            &self,
-            _start: u64,
-            _length: usize,
-        ) -> Result<Box<dyn Read + Send + Sync>> {
-            unimplemented!()
-        }
-
-        fn length(&self) -> u64 {
-            unimplemented!()
-        }
-    }
 }
diff --git a/datafusion/src/physical_optimizer/repartition.rs 
b/datafusion/src/physical_optimizer/repartition.rs
index af47d86..ea7de7f 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -114,6 +114,7 @@ mod tests {
     use crate::physical_plan::file_format::ParquetExec;
     use crate::physical_plan::projection::ProjectionExec;
     use crate::physical_plan::Statistics;
+    use crate::test::object_store::TestObjectStore;
 
     #[test]
     fn added_repartition_to_single_partition() -> Result<()> {
@@ -121,7 +122,7 @@ mod tests {
         let parquet_project = ProjectionExec::try_new(
             vec![],
             Arc::new(ParquetExec::new(
-                Arc::new(LocalFileSystem {}),
+                TestObjectStore::new_arc(&[("x", 100)]),
                 vec![vec![PartitionedFile::new("x".to_string(), 100)]],
                 Statistics::default(),
                 schema,
diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs
index c6eae35..4ad4722 100644
--- a/datafusion/src/test/mod.rs
+++ b/datafusion/src/test/mod.rs
@@ -303,5 +303,6 @@ pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn 
Future<Output = T> + Send
 }
 
 pub mod exec;
+pub mod object_store;
 pub mod user_defined;
 pub mod variable;
diff --git a/datafusion/src/test/object_store.rs 
b/datafusion/src/test/object_store.rs
new file mode 100644
index 0000000..4020b99
--- /dev/null
+++ b/datafusion/src/test/object_store.rs
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object store implem used for testing
+
+use std::{
+    io,
+    io::{Cursor, Read},
+    sync::Arc,
+};
+
+use crate::{
+    datasource::object_store::{
+        FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, 
SizedFile,
+    },
+    error::{DataFusionError, Result},
+};
+use async_trait::async_trait;
+use futures::{stream, AsyncRead, StreamExt};
+
+#[derive(Debug)]
+/// An object store implem that is useful for testing.
+/// The `ObjectReader`s are filled with zero bytes.
+pub struct TestObjectStore {
+    /// The `(path,size)` of the files that "exist" in the store
+    pub files: Vec<(String, u64)>,
+}
+
+impl TestObjectStore {
+    pub fn new_arc(files: &[(&str, u64)]) -> Arc<Self> {
+        Arc::new(Self {
+            files: files.iter().map(|f| (f.0.to_owned(), f.1)).collect(),
+        })
+    }
+}
+
+#[async_trait]
+impl ObjectStore for TestObjectStore {
+    async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
+        let prefix = prefix.to_owned();
+        Ok(Box::pin(
+            stream::iter(
+                self.files
+                    .clone()
+                    .into_iter()
+                    .filter(move |f| f.0.starts_with(&prefix)),
+            )
+            .map(|f| {
+                Ok(FileMeta {
+                    sized_file: SizedFile {
+                        path: f.0.clone(),
+                        size: f.1,
+                    },
+                    last_modified: None,
+                })
+            }),
+        ))
+    }
+
+    async fn list_dir(
+        &self,
+        _prefix: &str,
+        _delimiter: Option<String>,
+    ) -> Result<ListEntryStream> {
+        unimplemented!()
+    }
+
+    fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
+        match self.files.iter().find(|item| file.path == item.0) {
+            Some((_, size)) if *size == file.size => {
+                Ok(Arc::new(EmptyObjectReader(*size)))
+            }
+            Some(_) => Err(DataFusionError::IoError(io::Error::new(
+                io::ErrorKind::NotFound,
+                "found in test list but wrong size",
+            ))),
+            None => Err(DataFusionError::IoError(io::Error::new(
+                io::ErrorKind::NotFound,
+                "not in provided test list",
+            ))),
+        }
+    }
+}
+
+struct EmptyObjectReader(u64);
+
+#[async_trait]
+impl ObjectReader for EmptyObjectReader {
+    async fn chunk_reader(
+        &self,
+        _start: u64,
+        _length: usize,
+    ) -> Result<Box<dyn AsyncRead>> {
+        unimplemented!()
+    }
+
+    fn sync_chunk_reader(
+        &self,
+        _start: u64,
+        _length: usize,
+    ) -> Result<Box<dyn Read + Send + Sync>> {
+        Ok(Box::new(Cursor::new(vec![0; self.0 as usize])))
+    }
+
+    fn length(&self) -> u64 {
+        self.0
+    }
+}

Reply via email to