This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d644fae  Remove dependency of common for the storage crate (#2076)
d644fae is described below

commit d644fae6e4d6c80f6c0c829d199e0e8b7935e00a
Author: yahoNanJing <[email protected]>
AuthorDate: Sat Mar 26 00:47:49 2022 +0800

    Remove dependency of common for the storage crate (#2076)
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/rust/core/src/serde/logical_plan/mod.rs   | 34 +++++++++-------
 .../core/src/serde/physical_plan/from_proto.rs     |  3 +-
 ballista/rust/core/src/serde/physical_plan/mod.rs  | 17 ++++----
 .../rust/core/src/serde/physical_plan/to_proto.rs  |  2 +-
 datafusion-storage/Cargo.toml                      |  1 -
 datafusion-storage/src/lib.rs                      | 35 ++--------------
 datafusion-storage/src/object_store/local.rs       | 27 ++++++-------
 datafusion-storage/src/object_store/mod.rs         |  7 +---
 datafusion/src/datasource/file_format/avro.rs      |  4 +-
 datafusion/src/datasource/file_format/csv.rs       |  6 +--
 datafusion/src/datasource/file_format/json.rs      |  5 +--
 datafusion/src/datasource/file_format/parquet.rs   |  6 ++-
 datafusion/src/datasource/listing/helpers.rs       | 13 +++---
 datafusion/src/datasource/listing/mod.rs           | 47 ++++++++++++++++++++++
 datafusion/src/datasource/listing/table.rs         |  3 +-
 datafusion/src/datasource/mod.rs                   |  2 +-
 datafusion/src/physical_optimizer/repartition.rs   |  2 +-
 datafusion/src/physical_plan/file_format/avro.rs   |  7 +++-
 datafusion/src/physical_plan/file_format/csv.rs    |  5 +--
 .../src/physical_plan/file_format/file_stream.rs   |  3 +-
 datafusion/src/physical_plan/file_format/json.rs   |  7 +++-
 datafusion/src/physical_plan/file_format/mod.rs    |  3 +-
 .../src/physical_plan/file_format/parquet.rs       | 12 +++---
 datafusion/src/test/mod.rs                         |  6 +--
 datafusion/src/test/object_store.rs                | 17 ++++----
 datafusion/tests/path_partition.rs                 | 18 ++++++---
 26 files changed, 162 insertions(+), 130 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs 
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 82e4787..d64fac3 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -885,17 +885,17 @@ mod roundtrip_tests {
     use crate::serde::{AsLogicalPlan, BallistaCodec};
     use async_trait::async_trait;
     use core::panic;
-    use datafusion::datafusion_storage::{
-        object_store::{
-            local::LocalFileSystem, FileMetaStream, ListEntryStream, 
ObjectReader,
-            ObjectStore,
-        },
-        SizedFile,
-    };
-    use datafusion::datasource::listing::ListingTable;
-    use datafusion::error::DataFusionError;
     use datafusion::{
         arrow::datatypes::{DataType, Field, Schema},
+        datafusion_storage::{
+            self,
+            object_store::{
+                local::LocalFileSystem, FileMetaStream, ListEntryStream, 
ObjectReader,
+                ObjectStore,
+            },
+            SizedFile,
+        },
+        datasource::listing::ListingTable,
         logical_plan::{
             col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder, 
Repartition,
             ToDFSchema,
@@ -903,6 +903,7 @@ mod roundtrip_tests {
         prelude::*,
         sql::parser::FileType,
     };
+    use std::io;
     use std::sync::Arc;
 
     #[derive(Debug)]
@@ -913,8 +914,9 @@ mod roundtrip_tests {
         async fn list_file(
             &self,
             _prefix: &str,
-        ) -> datafusion::error::Result<FileMetaStream> {
-            Err(DataFusionError::NotImplemented(
+        ) -> datafusion_storage::Result<FileMetaStream> {
+            Err(io::Error::new(
+                io::ErrorKind::Unsupported,
                 "this is only a test object store".to_string(),
             ))
         }
@@ -923,8 +925,9 @@ mod roundtrip_tests {
             &self,
             _prefix: &str,
             _delimiter: Option<String>,
-        ) -> datafusion::error::Result<ListEntryStream> {
-            Err(DataFusionError::NotImplemented(
+        ) -> datafusion_storage::Result<ListEntryStream> {
+            Err(io::Error::new(
+                io::ErrorKind::Unsupported,
                 "this is only a test object store".to_string(),
             ))
         }
@@ -932,8 +935,9 @@ mod roundtrip_tests {
         fn file_reader(
             &self,
             _file: SizedFile,
-        ) -> datafusion::error::Result<Arc<dyn ObjectReader>> {
-            Err(DataFusionError::NotImplemented(
+        ) -> datafusion_storage::Result<Arc<dyn ObjectReader>> {
+            Err(io::Error::new(
+                io::ErrorKind::Unsupported,
                 "this is only a test object store".to_string(),
             ))
         }
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 20205d9..19fdcfa 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -27,8 +27,9 @@ use crate::{convert_box_required, convert_required};
 use chrono::{TimeZone, Utc};
 
 use datafusion::datafusion_storage::{
-    object_store::local::LocalFileSystem, FileMeta, PartitionedFile, SizedFile,
+    object_store::local::LocalFileSystem, FileMeta, SizedFile,
 };
+use datafusion::datasource::listing::PartitionedFile;
 use datafusion::execution::context::ExecutionProps;
 
 use datafusion::physical_plan::file_format::FileScanConfig;
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs 
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index e7d803d..4b91a45 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -34,7 +34,7 @@ use crate::{convert_box_required, convert_required, 
into_physical_plan, into_req
 use datafusion::arrow::compute::SortOptions;
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::datafusion_storage::object_store::local::LocalFileSystem;
-use datafusion::datafusion_storage::PartitionedFile;
+use datafusion::datasource::listing::PartitionedFile;
 use datafusion::logical_plan::window_frames::WindowFrame;
 use datafusion::physical_plan::aggregates::create_aggregate_expr;
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
@@ -941,33 +941,30 @@ mod roundtrip_tests {
     use std::sync::Arc;
 
     use crate::serde::{AsExecutionPlan, BallistaCodec};
-    use datafusion::datafusion_storage::{
-        object_store::local::LocalFileSystem, PartitionedFile,
-    };
-    use datafusion::physical_plan::sorts::sort::SortExec;
-    use datafusion::prelude::SessionContext;
     use datafusion::{
         arrow::{
             compute::kernels::sort::SortOptions,
             datatypes::{DataType, Field, Schema},
         },
+        datafusion_storage::object_store::local::LocalFileSystem,
+        datasource::listing::PartitionedFile,
         logical_plan::{JoinType, Operator},
         physical_plan::{
             empty::EmptyExec,
             expressions::{binary, col, lit, InListExpr, NotExpr},
             expressions::{Avg, Column, PhysicalSortExpr},
+            file_format::{FileScanConfig, ParquetExec},
             filter::FilterExec,
             hash_aggregate::{AggregateMode, HashAggregateExec},
             hash_join::{HashJoinExec, PartitionMode},
             limit::{GlobalLimitExec, LocalLimitExec},
-            AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr,
+            sorts::sort::SortExec,
+            AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, 
Statistics,
         },
+        prelude::SessionContext,
         scalar::ScalarValue,
     };
 
-    use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
-    use datafusion::physical_plan::Statistics;
-
     use super::super::super::error::Result;
     use super::super::protobuf;
     use crate::execution_plans::ShuffleWriterExec;
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 06605ec..9a63762 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -35,7 +35,7 @@ use datafusion::physical_plan::{
     Statistics,
 };
 
-use datafusion::datafusion_storage::PartitionedFile;
+use datafusion::datasource::listing::PartitionedFile;
 use datafusion::physical_plan::file_format::FileScanConfig;
 
 use datafusion::physical_plan::expressions::{Count, Literal};
diff --git a/datafusion-storage/Cargo.toml b/datafusion-storage/Cargo.toml
index 377a087..44e9152 100644
--- a/datafusion-storage/Cargo.toml
+++ b/datafusion-storage/Cargo.toml
@@ -35,7 +35,6 @@ path = "src/lib.rs"
 [dependencies]
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
-datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
 futures = "0.3"
 parking_lot = "0.12"
 tempfile = "3"
diff --git a/datafusion-storage/src/lib.rs b/datafusion-storage/src/lib.rs
index ce6e177..5da690a 100644
--- a/datafusion-storage/src/lib.rs
+++ b/datafusion-storage/src/lib.rs
@@ -18,7 +18,10 @@
 pub mod object_store;
 
 use chrono::{DateTime, Utc};
-use datafusion_common::ScalarValue;
+use std::{io, result};
+
+/// Result type for operations that could result in an io error
+pub type Result<T> = result::Result<T, io::Error>;
 
 /// Represents a specific file or a prefix (folder) that may
 /// require further resolution
@@ -72,33 +75,3 @@ impl std::fmt::Display for FileMeta {
         write!(f, "{} (size: {})", self.path(), self.size())
     }
 }
-
-#[derive(Debug, Clone)]
-/// A single file that should be read, along with its schema, statistics
-/// and partition column values that need to be appended to each row.
-pub struct PartitionedFile {
-    /// Path for the file (e.g. URL, filesystem path, etc)
-    pub file_meta: FileMeta,
-    /// Values of partition columns to be appended to each row
-    pub partition_values: Vec<ScalarValue>,
-    // We may include row group range here for a more fine-grained parallel 
execution
-}
-
-impl PartitionedFile {
-    /// Create a simple file without metadata or partition
-    pub fn new(path: String, size: u64) -> Self {
-        Self {
-            file_meta: FileMeta {
-                sized_file: SizedFile { path, size },
-                last_modified: None,
-            },
-            partition_values: vec![],
-        }
-    }
-}
-
-impl std::fmt::Display for PartitionedFile {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        write!(f, "{}", self.file_meta)
-    }
-}
diff --git a/datafusion-storage/src/object_store/local.rs 
b/datafusion-storage/src/object_store/local.rs
index 540996e..f4872ae 100644
--- a/datafusion-storage/src/object_store/local.rs
+++ b/datafusion-storage/src/object_store/local.rs
@@ -18,15 +18,14 @@
 //! Object store that represents the Local File System.
 
 use std::fs::{self, File, Metadata};
+use std::io;
 use std::io::{BufReader, Read, Seek, SeekFrom};
 use std::sync::Arc;
 
 use async_trait::async_trait;
 use futures::{stream, AsyncRead, StreamExt};
 
-use datafusion_common::{DataFusionError, Result};
-
-use crate::{FileMeta, PartitionedFile, SizedFile};
+use crate::{FileMeta, Result, SizedFile};
 
 use super::{
     FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream, 
ObjectStore,
@@ -131,7 +130,10 @@ async fn list_all(prefix: String) -> 
Result<FileMetaStream> {
                     files.push(get_meta(child_path.to_owned(), metadata))
                 }
             } else {
-                return Err(DataFusionError::Plan("Invalid path".to_string()));
+                return Err(io::Error::new(
+                    io::ErrorKind::InvalidInput,
+                    "Invalid path".to_string(),
+                ));
             }
         }
         Ok(files)
@@ -171,22 +173,19 @@ pub fn local_object_reader_stream(files: Vec<String>) -> 
ObjectReaderStream {
 /// Helper method to convert a file location to a `LocalFileReader`
 pub fn local_object_reader(file: String) -> Arc<dyn ObjectReader> {
     LocalFileSystem
-        .file_reader(local_unpartitioned_file(file).file_meta.sized_file)
+        .file_reader(local_unpartitioned_file(file).sized_file)
         .expect("File not found")
 }
 
 /// Helper method to fetch the file size and date at given path and create a 
`FileMeta`
-pub fn local_unpartitioned_file(file: String) -> PartitionedFile {
+pub fn local_unpartitioned_file(file: String) -> FileMeta {
     let metadata = fs::metadata(&file).expect("Local file metadata");
-    PartitionedFile {
-        file_meta: FileMeta {
-            sized_file: SizedFile {
-                size: metadata.len(),
-                path: file,
-            },
-            last_modified: 
metadata.modified().map(chrono::DateTime::from).ok(),
+    FileMeta {
+        sized_file: SizedFile {
+            size: metadata.len(),
+            path: file,
         },
-        partition_values: vec![],
+        last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
     }
 }
 
diff --git a/datafusion-storage/src/object_store/mod.rs 
b/datafusion-storage/src/object_store/mod.rs
index 0479648..5d2f76e 100644
--- a/datafusion-storage/src/object_store/mod.rs
+++ b/datafusion-storage/src/object_store/mod.rs
@@ -27,17 +27,12 @@ use std::sync::Arc;
 use async_trait::async_trait;
 use futures::{AsyncRead, Stream, StreamExt};
 
-use crate::{FileMeta, ListEntry, PartitionedFile, SizedFile};
-use datafusion_common::Result;
+use crate::{FileMeta, ListEntry, Result, SizedFile};
 
 /// Stream of files listed from object store
 pub type FileMetaStream =
     Pin<Box<dyn Stream<Item = Result<FileMeta>> + Send + Sync + 'static>>;
 
-/// Stream of files get listed from object store
-pub type PartitionedFileStream =
-    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 
'static>>;
-
 /// Stream of list entries obtained from object store
 pub type ListEntryStream =
     Pin<Box<dyn Stream<Item = Result<ListEntry>> + Send + Sync + 'static>>;
diff --git a/datafusion/src/datasource/file_format/avro.rs 
b/datafusion/src/datasource/file_format/avro.rs
index 0fc9c7a..f54650f 100644
--- a/datafusion/src/datasource/file_format/avro.rs
+++ b/datafusion/src/datasource/file_format/avro.rs
@@ -76,13 +76,13 @@ impl FileFormat for AvroFormat {
 mod tests {
     use crate::{
         datafusion_storage::object_store::local::{
-            local_object_reader, local_object_reader_stream, 
local_unpartitioned_file,
-            LocalFileSystem,
+            local_object_reader, local_object_reader_stream, LocalFileSystem,
         },
         physical_plan::collect,
     };
 
     use super::*;
+    use crate::datasource::listing::local_unpartitioned_file;
     use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use crate::prelude::{SessionConfig, SessionContext};
     use arrow::array::{
diff --git a/datafusion/src/datasource/file_format/csv.rs 
b/datafusion/src/datasource/file_format/csv.rs
index 41f6df1..955bc0f 100644
--- a/datafusion/src/datasource/file_format/csv.rs
+++ b/datafusion/src/datasource/file_format/csv.rs
@@ -138,11 +138,11 @@ mod tests {
     use arrow::array::StringArray;
 
     use super::*;
+    use crate::datasource::listing::local_unpartitioned_file;
     use crate::prelude::{SessionConfig, SessionContext};
     use crate::{
         datafusion_storage::object_store::local::{
-            local_object_reader, local_object_reader_stream, 
local_unpartitioned_file,
-            LocalFileSystem,
+            local_object_reader, local_object_reader_stream, LocalFileSystem,
         },
         datasource::file_format::FileScanConfig,
         physical_plan::collect,
@@ -271,7 +271,7 @@ mod tests {
         let exec = format
             .create_physical_plan(
                 FileScanConfig {
-                    object_store: Arc::new(LocalFileSystem {}),
+                    object_store: Arc::new(LocalFileSystem),
                     file_schema,
                     file_groups,
                     statistics,
diff --git a/datafusion/src/datasource/file_format/json.rs 
b/datafusion/src/datasource/file_format/json.rs
index 01e7982..0a347fa 100644
--- a/datafusion/src/datasource/file_format/json.rs
+++ b/datafusion/src/datasource/file_format/json.rs
@@ -103,10 +103,9 @@ mod tests {
     use crate::prelude::{SessionConfig, SessionContext};
     use crate::{
         datafusion_storage::object_store::local::{
-            local_object_reader, local_object_reader_stream, 
local_unpartitioned_file,
-            LocalFileSystem,
+            local_object_reader, local_object_reader_stream, LocalFileSystem,
         },
-        datasource::file_format::FileScanConfig,
+        datasource::{file_format::FileScanConfig, 
listing::local_unpartitioned_file},
         physical_plan::collect,
     };
 
diff --git a/datafusion/src/datasource/file_format/parquet.rs 
b/datafusion/src/datasource/file_format/parquet.rs
index 08998d2..374e360 100644
--- a/datafusion/src/datasource/file_format/parquet.rs
+++ b/datafusion/src/datasource/file_format/parquet.rs
@@ -89,6 +89,7 @@ impl FileFormat for ParquetFormat {
 
     async fn infer_schema(&self, readers: ObjectReaderStream) -> 
Result<SchemaRef> {
         let merged_schema = readers
+            .map_err(DataFusionError::IoError)
             .try_fold(Schema::empty(), |acc, reader| async {
                 let next_schema = fetch_schema(reader);
                 Schema::try_merge([acc, next_schema?])
@@ -351,16 +352,17 @@ impl ChunkReader for ChunkObjectReader {
     fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T> {
         self.0
             .sync_chunk_reader(start, length)
+            .map_err(DataFusionError::IoError)
             .map_err(|e| ParquetError::ArrowError(e.to_string()))
     }
 }
 
 #[cfg(test)]
 mod tests {
+    use crate::datasource::listing::local_unpartitioned_file;
     use crate::physical_plan::collect;
     use datafusion_storage::object_store::local::{
-        local_object_reader, local_object_reader_stream, 
local_unpartitioned_file,
-        LocalFileSystem,
+        local_object_reader, local_object_reader_stream, LocalFileSystem,
     };
 
     use super::*;
diff --git a/datafusion/src/datasource/listing/helpers.rs 
b/datafusion/src/datasource/listing/helpers.rs
index 2657474..10b3b48 100644
--- a/datafusion/src/datasource/listing/helpers.rs
+++ b/datafusion/src/datasource/listing/helpers.rs
@@ -29,6 +29,7 @@ use arrow::{
     record_batch::RecordBatch,
 };
 use chrono::{TimeZone, Utc};
+use datafusion_common::DataFusionError;
 use futures::{
     stream::{self},
     StreamExt, TryStreamExt,
@@ -44,10 +45,8 @@ use crate::{
     scalar::ScalarValue,
 };
 
-use datafusion_storage::{
-    object_store::{ObjectStore, PartitionedFileStream},
-    FileMeta, PartitionedFile, SizedFile,
-};
+use super::{PartitionedFile, PartitionedFileStream};
+use datafusion_storage::{object_store::ObjectStore, FileMeta, SizedFile};
 
 const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_";
 const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_";
@@ -234,7 +233,11 @@ pub async fn pruned_partition_list(
             // store if we switch to a streaming-stlye pruning of the files. 
For instance S3 lists
             // 1000 items at a time so batches of 1000 would be ideal with S3 
as store.
             .chunks(1024)
-            .map(|v| v.into_iter().collect::<Result<Vec<_>>>())
+            .map(|v| {
+                v.into_iter()
+                    .collect::<datafusion_storage::Result<Vec<_>>>()
+            })
+            .map_err(DataFusionError::IoError)
             .map(move |metas| paths_to_batch(table_partition_cols, 
&stream_path, &metas?))
             .try_collect()
             .await?;
diff --git a/datafusion/src/datasource/listing/mod.rs 
b/datafusion/src/datasource/listing/mod.rs
index 2f1d034..2a2042e 100644
--- a/datafusion/src/datasource/listing/mod.rs
+++ b/datafusion/src/datasource/listing/mod.rs
@@ -21,4 +21,51 @@
 mod helpers;
 mod table;
 
+use datafusion_common::ScalarValue;
+use datafusion_storage::{object_store::local, FileMeta, Result, SizedFile};
+use futures::Stream;
+use std::pin::Pin;
+
 pub use table::{ListingOptions, ListingTable, ListingTableConfig};
+
+/// Stream of files get listed from object store
+pub type PartitionedFileStream =
+    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 
'static>>;
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_meta: FileMeta,
+    /// Values of partition columns to be appended to each row
+    pub partition_values: Vec<ScalarValue>,
+    // We may include row group range here for a more fine-grained parallel 
execution
+}
+
+impl PartitionedFile {
+    /// Create a simple file without metadata or partition
+    pub fn new(path: String, size: u64) -> Self {
+        Self {
+            file_meta: FileMeta {
+                sized_file: SizedFile { path, size },
+                last_modified: None,
+            },
+            partition_values: vec![],
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_meta)
+    }
+}
+
+/// Helper method to fetch the file size and date at given path and create a 
`FileMeta`
+pub fn local_unpartitioned_file(file: String) -> PartitionedFile {
+    PartitionedFile {
+        file_meta: local::local_unpartitioned_file(file),
+        partition_values: vec![],
+    }
+}
diff --git a/datafusion/src/datasource/listing/table.rs 
b/datafusion/src/datasource/listing/table.rs
index 440bbba..42b836c 100644
--- a/datafusion/src/datasource/listing/table.rs
+++ b/datafusion/src/datasource/listing/table.rs
@@ -42,7 +42,8 @@ use crate::datasource::{
     get_statistics_with_limit, TableProvider,
 };
 
-use datafusion_storage::{object_store::ObjectStore, PartitionedFile};
+use super::PartitionedFile;
+use datafusion_storage::object_store::ObjectStore;
 
 use super::helpers::{expr_applicable_for_cols, pruned_partition_list, 
split_files};
 
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index 24bc687..2a801ff 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -28,12 +28,12 @@ pub mod object_store_registry;
 use futures::Stream;
 
 pub use self::datasource::{TableProvider, TableType};
+use self::listing::PartitionedFile;
 pub use self::memory::MemTable;
 use crate::arrow::datatypes::{Schema, SchemaRef};
 use crate::error::Result;
 use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
 use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
-use datafusion_storage::PartitionedFile;
 use futures::StreamExt;
 
 /// Get all files as well as the file level summary statistics (no statistic 
for partition columns).
diff --git a/datafusion/src/physical_optimizer/repartition.rs 
b/datafusion/src/physical_optimizer/repartition.rs
index b0dbc77..d98fa41 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -238,7 +238,7 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 
     use super::*;
-    use crate::datafusion_storage::PartitionedFile;
+    use crate::datasource::listing::PartitionedFile;
     use crate::physical_plan::expressions::{col, PhysicalSortExpr};
     use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
     use crate::physical_plan::filter::FilterExec;
diff --git a/datafusion/src/physical_plan/file_format/avro.rs 
b/datafusion/src/physical_plan/file_format/avro.rs
index 3ee23a9..3f6b857 100644
--- a/datafusion/src/physical_plan/file_format/avro.rs
+++ b/datafusion/src/physical_plan/file_format/avro.rs
@@ -174,9 +174,12 @@ impl ExecutionPlan for AvroExec {
 #[cfg(test)]
 #[cfg(feature = "avro")]
 mod tests {
-    use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
     use crate::datasource::object_store::local::{
-        local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
+        local_object_reader_stream, LocalFileSystem,
+    };
+    use crate::datasource::{
+        file_format::{avro::AvroFormat, FileFormat},
+        listing::local_unpartitioned_file,
     };
     use crate::scalar::ScalarValue;
     use arrow::datatypes::{DataType, Field, Schema};
diff --git a/datafusion/src/physical_plan/file_format/csv.rs 
b/datafusion/src/physical_plan/file_format/csv.rs
index f124e70..28466b8 100644
--- a/datafusion/src/physical_plan/file_format/csv.rs
+++ b/datafusion/src/physical_plan/file_format/csv.rs
@@ -221,9 +221,8 @@ pub async fn plan_to_csv(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::datafusion_storage::object_store::local::{
-        local_unpartitioned_file, LocalFileSystem,
-    };
+    use crate::datafusion_storage::object_store::local::LocalFileSystem;
+    use crate::datasource::listing::local_unpartitioned_file;
     use crate::prelude::*;
     use crate::test_util::aggr_test_schema_with_missing_col;
     use crate::{scalar::ScalarValue, test_util::aggr_test_schema};
diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs 
b/datafusion/src/physical_plan/file_format/file_stream.rs
index 6d8fa5f..78222ee 100644
--- a/datafusion/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/src/physical_plan/file_format/file_stream.rs
@@ -21,13 +21,14 @@
 //! Note: Most traits here need to be marked `Sync + Send` to be
 //! compliant with the `SendableRecordBatchStream` trait.
 
+use crate::datasource::listing::PartitionedFile;
 use crate::{physical_plan::RecordBatchStream, scalar::ScalarValue};
 use arrow::{
     datatypes::SchemaRef,
     error::{ArrowError, Result as ArrowResult},
     record_batch::RecordBatch,
 };
-use datafusion_storage::{object_store::ObjectStore, PartitionedFile};
+use datafusion_storage::object_store::ObjectStore;
 use futures::Stream;
 use std::{
     io::Read,
diff --git a/datafusion/src/physical_plan/file_format/json.rs 
b/datafusion/src/physical_plan/file_format/json.rs
index e96ff8b..4eb2fda 100644
--- a/datafusion/src/physical_plan/file_format/json.rs
+++ b/datafusion/src/physical_plan/file_format/json.rs
@@ -194,9 +194,12 @@ mod tests {
     use futures::StreamExt;
 
     use crate::datafusion_storage::object_store::local::{
-        local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
+        local_object_reader_stream, LocalFileSystem,
+    };
+    use crate::datasource::{
+        file_format::{json::JsonFormat, FileFormat},
+        listing::local_unpartitioned_file,
     };
-    use crate::datasource::file_format::{json::JsonFormat, FileFormat};
     use crate::prelude::NdJsonReadOptions;
     use crate::prelude::*;
     use tempfile::TempDir;
diff --git a/datafusion/src/physical_plan/file_format/mod.rs 
b/datafusion/src/physical_plan/file_format/mod.rs
index 8e150e8..78d1dc2 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -38,12 +38,13 @@ pub use csv::CsvExec;
 pub(crate) use json::plan_to_json;
 pub use json::NdJsonExec;
 
+use crate::datasource::listing::PartitionedFile;
 use crate::{
     error::{DataFusionError, Result},
     scalar::ScalarValue,
 };
 use arrow::array::{new_null_array, UInt16BufferBuilder};
-use datafusion_storage::{object_store::ObjectStore, PartitionedFile};
+use datafusion_storage::object_store::ObjectStore;
 use lazy_static::lazy_static;
 use log::info;
 use std::{
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs 
b/datafusion/src/physical_plan/file_format/parquet.rs
index cda6727..c77e7d8 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -41,7 +41,7 @@ use crate::{
 };
 use datafusion_common::Column;
 use datafusion_expr::Expr;
-use datafusion_storage::{object_store::ObjectStore, PartitionedFile};
+use datafusion_storage::object_store::ObjectStore;
 
 use arrow::{
     array::ArrayRef,
@@ -66,6 +66,7 @@ use tokio::{
     task,
 };
 
+use crate::datasource::listing::PartitionedFile;
 use crate::physical_plan::file_format::SchemaAdapter;
 use async_trait::async_trait;
 
@@ -580,12 +581,13 @@ mod tests {
     use crate::{
         assert_batches_sorted_eq, assert_contains,
         datafusion_storage::{
-            object_store::local::{
-                local_object_reader_stream, local_unpartitioned_file, 
LocalFileSystem,
-            },
+            object_store::local::{local_object_reader_stream, LocalFileSystem},
             FileMeta, SizedFile,
         },
-        datasource::file_format::{parquet::ParquetFormat, FileFormat},
+        datasource::{
+            file_format::{parquet::ParquetFormat, FileFormat},
+            listing::local_unpartitioned_file,
+        },
         physical_plan::collect,
     };
 
diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs
index b96d981..4bdf6d6 100644
--- a/datafusion/src/test/mod.rs
+++ b/datafusion/src/test/mod.rs
@@ -18,10 +18,10 @@
 //! Common unit test utility methods
 
 use crate::arrow::array::UInt32Array;
-use crate::datafusion_storage::{
-    object_store::local::local_unpartitioned_file, PartitionedFile,
+use crate::datasource::{
+    listing::{local_unpartitioned_file, PartitionedFile},
+    MemTable, TableProvider,
 };
-use crate::datasource::{MemTable, TableProvider};
 use crate::error::Result;
 use crate::from_slice::FromSlice;
 use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder};
diff --git a/datafusion/src/test/object_store.rs 
b/datafusion/src/test/object_store.rs
index 9646890..61b4265 100644
--- a/datafusion/src/test/object_store.rs
+++ b/datafusion/src/test/object_store.rs
@@ -22,12 +22,9 @@ use std::{
     sync::Arc,
 };
 
-use crate::{
-    datafusion_storage::{
-        object_store::{FileMetaStream, ListEntryStream, ObjectReader, 
ObjectStore},
-        FileMeta, SizedFile,
-    },
-    error::{DataFusionError, Result},
+use crate::datafusion_storage::{
+    object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore},
+    FileMeta, Result, SizedFile,
 };
 use async_trait::async_trait;
 use futures::{stream, AsyncRead, StreamExt};
@@ -84,14 +81,14 @@ impl ObjectStore for TestObjectStore {
             Some((_, size)) if *size == file.size => {
                 Ok(Arc::new(EmptyObjectReader(*size)))
             }
-            Some(_) => Err(DataFusionError::IoError(io::Error::new(
+            Some(_) => Err(io::Error::new(
                 io::ErrorKind::NotFound,
                 "found in test list but wrong size",
-            ))),
-            None => Err(DataFusionError::IoError(io::Error::new(
+            )),
+            None => Err(io::Error::new(
                 io::ErrorKind::NotFound,
                 "not in provided test list",
-            ))),
+            )),
         }
     }
 }
diff --git a/datafusion/tests/path_partition.rs 
b/datafusion/tests/path_partition.rs
index 3e46049..8475ea5 100644
--- a/datafusion/tests/path_partition.rs
+++ b/datafusion/tests/path_partition.rs
@@ -33,7 +33,7 @@ use datafusion::{
         file_format::{csv::CsvFormat, parquet::ParquetFormat},
         listing::{ListingOptions, ListingTable, ListingTableConfig},
     },
-    error::{DataFusionError, Result},
+    error::Result,
     physical_plan::ColumnStatistics,
     prelude::SessionContext,
     test_util::{self, arrow_test_data, parquet_test_data},
@@ -352,7 +352,10 @@ impl MirroringObjectStore {
 
 #[async_trait]
 impl ObjectStore for MirroringObjectStore {
-    async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
+    async fn list_file(
+        &self,
+        prefix: &str,
+    ) -> datafusion_storage::Result<FileMetaStream> {
         let prefix = prefix.to_owned();
         let size = self.file_size;
         Ok(Box::pin(
@@ -375,11 +378,14 @@ impl ObjectStore for MirroringObjectStore {
         &self,
         _prefix: &str,
         _delimiter: Option<String>,
-    ) -> Result<ListEntryStream> {
+    ) -> datafusion_storage::Result<ListEntryStream> {
         unimplemented!()
     }
 
-    fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
+    fn file_reader(
+        &self,
+        file: SizedFile,
+    ) -> datafusion_storage::Result<Arc<dyn ObjectReader>> {
         assert_eq!(
             self.file_size, file.size,
             "Requested files should have the same size as the mirrored file"
@@ -389,10 +395,10 @@ impl ObjectStore for MirroringObjectStore {
                 path: self.mirrored_file.clone(),
                 size: self.file_size,
             })?),
-            None => Err(DataFusionError::IoError(io::Error::new(
+            None => Err(io::Error::new(
                 io::ErrorKind::NotFound,
                 "not in provided test list",
-            ))),
+            )),
         }
     }
 }

Reply via email to