This is an automated email from the ASF dual-hosted git repository.
houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 4fa7e64 [test] added test object store (#1124)
4fa7e64 is described below
commit 4fa7e6465eb61d5adcc054adef78cf6da99847bd
Author: rdettai <[email protected]>
AuthorDate: Sun Oct 17 05:22:41 2021 +0200
[test] added test object store (#1124)
---
datafusion/src/datasource/listing.rs | 149 +++++++++++------------
datafusion/src/physical_optimizer/repartition.rs | 3 +-
datafusion/src/test/mod.rs | 1 +
datafusion/src/test/object_store.rs | 122 +++++++++++++++++++
4 files changed, 198 insertions(+), 77 deletions(-)
diff --git a/datafusion/src/datasource/listing.rs
b/datafusion/src/datasource/listing.rs
index 585a40f..4af82d0 100644
--- a/datafusion/src/datasource/listing.rs
+++ b/datafusion/src/datasource/listing.rs
@@ -283,16 +283,12 @@ fn split_files(
#[cfg(test)]
mod tests {
- use std::io::Read;
-
- use futures::AsyncRead;
-
- use crate::datasource::{
- file_format::{avro::AvroFormat, parquet::ParquetFormat},
- object_store::{
- local::LocalFileSystem, FileMeta, FileMetaStream, ListEntryStream,
- ObjectReader, ObjectStore, SizedFile,
+ use crate::{
+ datasource::{
+ file_format::{avro::AvroFormat, parquet::ParquetFormat},
+ object_store::{local::LocalFileSystem, FileMeta, ObjectStore,
SizedFile},
},
+ test::object_store::TestObjectStore,
};
use super::*;
@@ -363,10 +359,67 @@ mod tests {
#[tokio::test]
async fn file_listings() -> Result<()> {
- assert_partitioning(5, 12, 5).await?;
- assert_partitioning(4, 4, 4).await?;
- assert_partitioning(5, 2, 2).await?;
- assert_partitioning(0, 2, 0).await.expect_err("no files");
+ // more expected partitions than files
+ assert_partitioning(
+ &[
+ "bucket/key-prefix/file0",
+ "bucket/key-prefix/file1",
+ "bucket/key-prefix/file2",
+ "bucket/key-prefix/file3",
+ "bucket/key-prefix/file4",
+ ],
+ "bucket/key-prefix/",
+ 12,
+ 5,
+ )
+ .await?;
+
+ // as many expected partitions as files
+ assert_partitioning(
+ &[
+ "bucket/key-prefix/file0",
+ "bucket/key-prefix/file1",
+ "bucket/key-prefix/file2",
+ "bucket/key-prefix/file3",
+ ],
+ "bucket/key-prefix/",
+ 4,
+ 4,
+ )
+ .await?;
+
+ // more files as expected partitions
+ assert_partitioning(
+ &[
+ "bucket/key-prefix/file0",
+ "bucket/key-prefix/file1",
+ "bucket/key-prefix/file2",
+ "bucket/key-prefix/file3",
+ "bucket/key-prefix/file4",
+ ],
+ "bucket/key-prefix/",
+ 2,
+ 2,
+ )
+ .await?;
+
+ // no files
+ assert_partitioning(&[], "bucket/key-prefix/", 2, 0)
+ .await
+ .expect_err("no files");
+
+ // files that don't match the prefix
+ assert_partitioning(
+ &[
+ "bucket/key-prefix/file0",
+ "bucket/key-prefix/file1",
+ "bucket/other-prefix/roguefile",
+ ],
+ "bucket/key-prefix/",
+ 10,
+ 2,
+ )
+ .await?;
Ok(())
}
@@ -390,13 +443,16 @@ mod tests {
Ok(Arc::new(table))
}
+ /// Check that the files listed by the table match the specified
`output_partitioning`
+ /// when the object store contains `files`.
async fn assert_partitioning(
- files_in_folder: usize,
+ files: &[&str],
+ table_prefix: &str,
target_partitions: usize,
output_partitioning: usize,
) -> Result<()> {
let mock_store: Arc<dyn ObjectStore> =
- Arc::new(MockObjectStore { files_in_folder });
+ TestObjectStore::new_arc(&files.iter().map(|f| (*f,
10)).collect::<Vec<_>>());
let format = AvroFormat {};
@@ -412,76 +468,17 @@ mod tests {
let table = ListingTable::new(
Arc::clone(&mock_store),
- "bucket/key-prefix".to_owned(),
+ table_prefix.to_owned(),
Arc::new(schema),
opt,
);
let (file_list, _) = table
- .list_files_for_scan(mock_store, "bucket/key-prefix", &[], None)
+ .list_files_for_scan(mock_store, table_prefix, &[], None)
.await?;
assert_eq!(file_list.len(), output_partitioning);
Ok(())
}
-
- #[derive(Debug)]
- struct MockObjectStore {
- pub files_in_folder: usize,
- }
-
- #[async_trait]
- impl ObjectStore for MockObjectStore {
- async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
- let prefix = prefix.to_owned();
- let files = (0..self.files_in_folder).map(move |i| {
- Ok(FileMeta {
- sized_file: SizedFile {
- path: format!("{}file{}", prefix, i),
- size: 100,
- },
- last_modified: None,
- })
- });
- Ok(Box::pin(futures::stream::iter(files)))
- }
-
- async fn list_dir(
- &self,
- _prefix: &str,
- _delimiter: Option<String>,
- ) -> Result<ListEntryStream> {
- unimplemented!()
- }
-
- fn file_reader(&self, _file: SizedFile) -> Result<Arc<dyn
ObjectReader>> {
- Ok(Arc::new(MockObjectReader {}))
- }
- }
-
- struct MockObjectReader {}
-
- #[async_trait]
- impl ObjectReader for MockObjectReader {
- async fn chunk_reader(
- &self,
- _start: u64,
- _length: usize,
- ) -> Result<Box<dyn AsyncRead>> {
- unimplemented!()
- }
-
- fn sync_chunk_reader(
- &self,
- _start: u64,
- _length: usize,
- ) -> Result<Box<dyn Read + Send + Sync>> {
- unimplemented!()
- }
-
- fn length(&self) -> u64 {
- unimplemented!()
- }
- }
}
diff --git a/datafusion/src/physical_optimizer/repartition.rs
b/datafusion/src/physical_optimizer/repartition.rs
index af47d86..ea7de7f 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -114,6 +114,7 @@ mod tests {
use crate::physical_plan::file_format::ParquetExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::Statistics;
+ use crate::test::object_store::TestObjectStore;
#[test]
fn added_repartition_to_single_partition() -> Result<()> {
@@ -121,7 +122,7 @@ mod tests {
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
- Arc::new(LocalFileSystem {}),
+ TestObjectStore::new_arc(&[("x", 100)]),
vec![vec![PartitionedFile::new("x".to_string(), 100)]],
Statistics::default(),
schema,
diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs
index c6eae35..4ad4722 100644
--- a/datafusion/src/test/mod.rs
+++ b/datafusion/src/test/mod.rs
@@ -303,5 +303,6 @@ pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn
Future<Output = T> + Send
}
pub mod exec;
+pub mod object_store;
pub mod user_defined;
pub mod variable;
diff --git a/datafusion/src/test/object_store.rs
b/datafusion/src/test/object_store.rs
new file mode 100644
index 0000000..4020b99
--- /dev/null
+++ b/datafusion/src/test/object_store.rs
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object store implem used for testing
+
+use std::{
+ io,
+ io::{Cursor, Read},
+ sync::Arc,
+};
+
+use crate::{
+ datasource::object_store::{
+ FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
SizedFile,
+ },
+ error::{DataFusionError, Result},
+};
+use async_trait::async_trait;
+use futures::{stream, AsyncRead, StreamExt};
+
+#[derive(Debug)]
+/// An object store implem that is useful for testing.
+/// The `ObjectReader`s are filled with zero bytes.
+pub struct TestObjectStore {
+ /// The `(path,size)` of the files that "exist" in the store
+ pub files: Vec<(String, u64)>,
+}
+
+impl TestObjectStore {
+ pub fn new_arc(files: &[(&str, u64)]) -> Arc<Self> {
+ Arc::new(Self {
+ files: files.iter().map(|f| (f.0.to_owned(), f.1)).collect(),
+ })
+ }
+}
+
+#[async_trait]
+impl ObjectStore for TestObjectStore {
+ async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
+ let prefix = prefix.to_owned();
+ Ok(Box::pin(
+ stream::iter(
+ self.files
+ .clone()
+ .into_iter()
+ .filter(move |f| f.0.starts_with(&prefix)),
+ )
+ .map(|f| {
+ Ok(FileMeta {
+ sized_file: SizedFile {
+ path: f.0.clone(),
+ size: f.1,
+ },
+ last_modified: None,
+ })
+ }),
+ ))
+ }
+
+ async fn list_dir(
+ &self,
+ _prefix: &str,
+ _delimiter: Option<String>,
+ ) -> Result<ListEntryStream> {
+ unimplemented!()
+ }
+
+ fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
+ match self.files.iter().find(|item| file.path == item.0) {
+ Some((_, size)) if *size == file.size => {
+ Ok(Arc::new(EmptyObjectReader(*size)))
+ }
+ Some(_) => Err(DataFusionError::IoError(io::Error::new(
+ io::ErrorKind::NotFound,
+ "found in test list but wrong size",
+ ))),
+ None => Err(DataFusionError::IoError(io::Error::new(
+ io::ErrorKind::NotFound,
+ "not in provided test list",
+ ))),
+ }
+ }
+}
+
+struct EmptyObjectReader(u64);
+
+#[async_trait]
+impl ObjectReader for EmptyObjectReader {
+ async fn chunk_reader(
+ &self,
+ _start: u64,
+ _length: usize,
+ ) -> Result<Box<dyn AsyncRead>> {
+ unimplemented!()
+ }
+
+ fn sync_chunk_reader(
+ &self,
+ _start: u64,
+ _length: usize,
+ ) -> Result<Box<dyn Read + Send + Sync>> {
+ Ok(Box::new(Cursor::new(vec![0; self.0 as usize])))
+ }
+
+ fn length(&self) -> u64 {
+ self.0
+ }
+}