This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d644fae Remove dependency of common for the storage crate (#2076)
d644fae is described below
commit d644fae6e4d6c80f6c0c829d199e0e8b7935e00a
Author: yahoNanJing <[email protected]>
AuthorDate: Sat Mar 26 00:47:49 2022 +0800
Remove dependency of common for the storage crate (#2076)
Co-authored-by: yangzhong <[email protected]>
---
ballista/rust/core/src/serde/logical_plan/mod.rs | 34 +++++++++-------
.../core/src/serde/physical_plan/from_proto.rs | 3 +-
ballista/rust/core/src/serde/physical_plan/mod.rs | 17 ++++----
.../rust/core/src/serde/physical_plan/to_proto.rs | 2 +-
datafusion-storage/Cargo.toml | 1 -
datafusion-storage/src/lib.rs | 35 ++--------------
datafusion-storage/src/object_store/local.rs | 27 ++++++-------
datafusion-storage/src/object_store/mod.rs | 7 +---
datafusion/src/datasource/file_format/avro.rs | 4 +-
datafusion/src/datasource/file_format/csv.rs | 6 +--
datafusion/src/datasource/file_format/json.rs | 5 +--
datafusion/src/datasource/file_format/parquet.rs | 6 ++-
datafusion/src/datasource/listing/helpers.rs | 13 +++---
datafusion/src/datasource/listing/mod.rs | 47 ++++++++++++++++++++++
datafusion/src/datasource/listing/table.rs | 3 +-
datafusion/src/datasource/mod.rs | 2 +-
datafusion/src/physical_optimizer/repartition.rs | 2 +-
datafusion/src/physical_plan/file_format/avro.rs | 7 +++-
datafusion/src/physical_plan/file_format/csv.rs | 5 +--
.../src/physical_plan/file_format/file_stream.rs | 3 +-
datafusion/src/physical_plan/file_format/json.rs | 7 +++-
datafusion/src/physical_plan/file_format/mod.rs | 3 +-
.../src/physical_plan/file_format/parquet.rs | 12 +++---
datafusion/src/test/mod.rs | 6 +--
datafusion/src/test/object_store.rs | 17 ++++----
datafusion/tests/path_partition.rs | 18 ++++++---
26 files changed, 162 insertions(+), 130 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 82e4787..d64fac3 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -885,17 +885,17 @@ mod roundtrip_tests {
use crate::serde::{AsLogicalPlan, BallistaCodec};
use async_trait::async_trait;
use core::panic;
- use datafusion::datafusion_storage::{
- object_store::{
- local::LocalFileSystem, FileMetaStream, ListEntryStream,
ObjectReader,
- ObjectStore,
- },
- SizedFile,
- };
- use datafusion::datasource::listing::ListingTable;
- use datafusion::error::DataFusionError;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
+ datafusion_storage::{
+ self,
+ object_store::{
+ local::LocalFileSystem, FileMetaStream, ListEntryStream,
ObjectReader,
+ ObjectStore,
+ },
+ SizedFile,
+ },
+ datasource::listing::ListingTable,
logical_plan::{
col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder,
Repartition,
ToDFSchema,
@@ -903,6 +903,7 @@ mod roundtrip_tests {
prelude::*,
sql::parser::FileType,
};
+ use std::io;
use std::sync::Arc;
#[derive(Debug)]
@@ -913,8 +914,9 @@ mod roundtrip_tests {
async fn list_file(
&self,
_prefix: &str,
- ) -> datafusion::error::Result<FileMetaStream> {
- Err(DataFusionError::NotImplemented(
+ ) -> datafusion_storage::Result<FileMetaStream> {
+ Err(io::Error::new(
+ io::ErrorKind::Unsupported,
"this is only a test object store".to_string(),
))
}
@@ -923,8 +925,9 @@ mod roundtrip_tests {
&self,
_prefix: &str,
_delimiter: Option<String>,
- ) -> datafusion::error::Result<ListEntryStream> {
- Err(DataFusionError::NotImplemented(
+ ) -> datafusion_storage::Result<ListEntryStream> {
+ Err(io::Error::new(
+ io::ErrorKind::Unsupported,
"this is only a test object store".to_string(),
))
}
@@ -932,8 +935,9 @@ mod roundtrip_tests {
fn file_reader(
&self,
_file: SizedFile,
- ) -> datafusion::error::Result<Arc<dyn ObjectReader>> {
- Err(DataFusionError::NotImplemented(
+ ) -> datafusion_storage::Result<Arc<dyn ObjectReader>> {
+ Err(io::Error::new(
+ io::ErrorKind::Unsupported,
"this is only a test object store".to_string(),
))
}
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 20205d9..19fdcfa 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -27,8 +27,9 @@ use crate::{convert_box_required, convert_required};
use chrono::{TimeZone, Utc};
use datafusion::datafusion_storage::{
- object_store::local::LocalFileSystem, FileMeta, PartitionedFile, SizedFile,
+ object_store::local::LocalFileSystem, FileMeta, SizedFile,
};
+use datafusion::datasource::listing::PartitionedFile;
use datafusion::execution::context::ExecutionProps;
use datafusion::physical_plan::file_format::FileScanConfig;
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index e7d803d..4b91a45 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -34,7 +34,7 @@ use crate::{convert_box_required, convert_required,
into_physical_plan, into_req
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datafusion_storage::object_store::local::LocalFileSystem;
-use datafusion::datafusion_storage::PartitionedFile;
+use datafusion::datasource::listing::PartitionedFile;
use datafusion::logical_plan::window_frames::WindowFrame;
use datafusion::physical_plan::aggregates::create_aggregate_expr;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
@@ -941,33 +941,30 @@ mod roundtrip_tests {
use std::sync::Arc;
use crate::serde::{AsExecutionPlan, BallistaCodec};
- use datafusion::datafusion_storage::{
- object_store::local::LocalFileSystem, PartitionedFile,
- };
- use datafusion::physical_plan::sorts::sort::SortExec;
- use datafusion::prelude::SessionContext;
use datafusion::{
arrow::{
compute::kernels::sort::SortOptions,
datatypes::{DataType, Field, Schema},
},
+ datafusion_storage::object_store::local::LocalFileSystem,
+ datasource::listing::PartitionedFile,
logical_plan::{JoinType, Operator},
physical_plan::{
empty::EmptyExec,
expressions::{binary, col, lit, InListExpr, NotExpr},
expressions::{Avg, Column, PhysicalSortExpr},
+ file_format::{FileScanConfig, ParquetExec},
filter::FilterExec,
hash_aggregate::{AggregateMode, HashAggregateExec},
hash_join::{HashJoinExec, PartitionMode},
limit::{GlobalLimitExec, LocalLimitExec},
- AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr,
+ sorts::sort::SortExec,
+ AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr,
Statistics,
},
+ prelude::SessionContext,
scalar::ScalarValue,
};
- use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
- use datafusion::physical_plan::Statistics;
-
use super::super::super::error::Result;
use super::super::protobuf;
use crate::execution_plans::ShuffleWriterExec;
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 06605ec..9a63762 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -35,7 +35,7 @@ use datafusion::physical_plan::{
Statistics,
};
-use datafusion::datafusion_storage::PartitionedFile;
+use datafusion::datasource::listing::PartitionedFile;
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::expressions::{Count, Literal};
diff --git a/datafusion-storage/Cargo.toml b/datafusion-storage/Cargo.toml
index 377a087..44e9152 100644
--- a/datafusion-storage/Cargo.toml
+++ b/datafusion-storage/Cargo.toml
@@ -35,7 +35,6 @@ path = "src/lib.rs"
[dependencies]
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
-datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
futures = "0.3"
parking_lot = "0.12"
tempfile = "3"
diff --git a/datafusion-storage/src/lib.rs b/datafusion-storage/src/lib.rs
index ce6e177..5da690a 100644
--- a/datafusion-storage/src/lib.rs
+++ b/datafusion-storage/src/lib.rs
@@ -18,7 +18,10 @@
pub mod object_store;
use chrono::{DateTime, Utc};
-use datafusion_common::ScalarValue;
+use std::{io, result};
+
+/// Result type for operations that could result in an io error
+pub type Result<T> = result::Result<T, io::Error>;
/// Represents a specific file or a prefix (folder) that may
/// require further resolution
@@ -72,33 +75,3 @@ impl std::fmt::Display for FileMeta {
write!(f, "{} (size: {})", self.path(), self.size())
}
}
-
-#[derive(Debug, Clone)]
-/// A single file that should be read, along with its schema, statistics
-/// and partition column values that need to be appended to each row.
-pub struct PartitionedFile {
- /// Path for the file (e.g. URL, filesystem path, etc)
- pub file_meta: FileMeta,
- /// Values of partition columns to be appended to each row
- pub partition_values: Vec<ScalarValue>,
- // We may include row group range here for a more fine-grained parallel
execution
-}
-
-impl PartitionedFile {
- /// Create a simple file without metadata or partition
- pub fn new(path: String, size: u64) -> Self {
- Self {
- file_meta: FileMeta {
- sized_file: SizedFile { path, size },
- last_modified: None,
- },
- partition_values: vec![],
- }
- }
-}
-
-impl std::fmt::Display for PartitionedFile {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "{}", self.file_meta)
- }
-}
diff --git a/datafusion-storage/src/object_store/local.rs
b/datafusion-storage/src/object_store/local.rs
index 540996e..f4872ae 100644
--- a/datafusion-storage/src/object_store/local.rs
+++ b/datafusion-storage/src/object_store/local.rs
@@ -18,15 +18,14 @@
//! Object store that represents the Local File System.
use std::fs::{self, File, Metadata};
+use std::io;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt};
-use datafusion_common::{DataFusionError, Result};
-
-use crate::{FileMeta, PartitionedFile, SizedFile};
+use crate::{FileMeta, Result, SizedFile};
use super::{
FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream,
ObjectStore,
@@ -131,7 +130,10 @@ async fn list_all(prefix: String) ->
Result<FileMetaStream> {
files.push(get_meta(child_path.to_owned(), metadata))
}
} else {
- return Err(DataFusionError::Plan("Invalid path".to_string()));
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "Invalid path".to_string(),
+ ));
}
}
Ok(files)
@@ -171,22 +173,19 @@ pub fn local_object_reader_stream(files: Vec<String>) ->
ObjectReaderStream {
/// Helper method to convert a file location to a `LocalFileReader`
pub fn local_object_reader(file: String) -> Arc<dyn ObjectReader> {
LocalFileSystem
- .file_reader(local_unpartitioned_file(file).file_meta.sized_file)
+ .file_reader(local_unpartitioned_file(file).sized_file)
.expect("File not found")
}
/// Helper method to fetch the file size and date at given path and create a
`FileMeta`
-pub fn local_unpartitioned_file(file: String) -> PartitionedFile {
+pub fn local_unpartitioned_file(file: String) -> FileMeta {
let metadata = fs::metadata(&file).expect("Local file metadata");
- PartitionedFile {
- file_meta: FileMeta {
- sized_file: SizedFile {
- size: metadata.len(),
- path: file,
- },
- last_modified:
metadata.modified().map(chrono::DateTime::from).ok(),
+ FileMeta {
+ sized_file: SizedFile {
+ size: metadata.len(),
+ path: file,
},
- partition_values: vec![],
+ last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
}
}
diff --git a/datafusion-storage/src/object_store/mod.rs
b/datafusion-storage/src/object_store/mod.rs
index 0479648..5d2f76e 100644
--- a/datafusion-storage/src/object_store/mod.rs
+++ b/datafusion-storage/src/object_store/mod.rs
@@ -27,17 +27,12 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::{AsyncRead, Stream, StreamExt};
-use crate::{FileMeta, ListEntry, PartitionedFile, SizedFile};
-use datafusion_common::Result;
+use crate::{FileMeta, ListEntry, Result, SizedFile};
/// Stream of files listed from object store
pub type FileMetaStream =
Pin<Box<dyn Stream<Item = Result<FileMeta>> + Send + Sync + 'static>>;
-/// Stream of files get listed from object store
-pub type PartitionedFileStream =
- Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync +
'static>>;
-
/// Stream of list entries obtained from object store
pub type ListEntryStream =
Pin<Box<dyn Stream<Item = Result<ListEntry>> + Send + Sync + 'static>>;
diff --git a/datafusion/src/datasource/file_format/avro.rs
b/datafusion/src/datasource/file_format/avro.rs
index 0fc9c7a..f54650f 100644
--- a/datafusion/src/datasource/file_format/avro.rs
+++ b/datafusion/src/datasource/file_format/avro.rs
@@ -76,13 +76,13 @@ impl FileFormat for AvroFormat {
mod tests {
use crate::{
datafusion_storage::object_store::local::{
- local_object_reader, local_object_reader_stream,
local_unpartitioned_file,
- LocalFileSystem,
+ local_object_reader, local_object_reader_stream, LocalFileSystem,
},
physical_plan::collect,
};
use super::*;
+ use crate::datasource::listing::local_unpartitioned_file;
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{
diff --git a/datafusion/src/datasource/file_format/csv.rs
b/datafusion/src/datasource/file_format/csv.rs
index 41f6df1..955bc0f 100644
--- a/datafusion/src/datasource/file_format/csv.rs
+++ b/datafusion/src/datasource/file_format/csv.rs
@@ -138,11 +138,11 @@ mod tests {
use arrow::array::StringArray;
use super::*;
+ use crate::datasource::listing::local_unpartitioned_file;
use crate::prelude::{SessionConfig, SessionContext};
use crate::{
datafusion_storage::object_store::local::{
- local_object_reader, local_object_reader_stream,
local_unpartitioned_file,
- LocalFileSystem,
+ local_object_reader, local_object_reader_stream, LocalFileSystem,
},
datasource::file_format::FileScanConfig,
physical_plan::collect,
@@ -271,7 +271,7 @@ mod tests {
let exec = format
.create_physical_plan(
FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
+ object_store: Arc::new(LocalFileSystem),
file_schema,
file_groups,
statistics,
diff --git a/datafusion/src/datasource/file_format/json.rs
b/datafusion/src/datasource/file_format/json.rs
index 01e7982..0a347fa 100644
--- a/datafusion/src/datasource/file_format/json.rs
+++ b/datafusion/src/datasource/file_format/json.rs
@@ -103,10 +103,9 @@ mod tests {
use crate::prelude::{SessionConfig, SessionContext};
use crate::{
datafusion_storage::object_store::local::{
- local_object_reader, local_object_reader_stream,
local_unpartitioned_file,
- LocalFileSystem,
+ local_object_reader, local_object_reader_stream, LocalFileSystem,
},
- datasource::file_format::FileScanConfig,
+ datasource::{file_format::FileScanConfig,
listing::local_unpartitioned_file},
physical_plan::collect,
};
diff --git a/datafusion/src/datasource/file_format/parquet.rs
b/datafusion/src/datasource/file_format/parquet.rs
index 08998d2..374e360 100644
--- a/datafusion/src/datasource/file_format/parquet.rs
+++ b/datafusion/src/datasource/file_format/parquet.rs
@@ -89,6 +89,7 @@ impl FileFormat for ParquetFormat {
async fn infer_schema(&self, readers: ObjectReaderStream) ->
Result<SchemaRef> {
let merged_schema = readers
+ .map_err(DataFusionError::IoError)
.try_fold(Schema::empty(), |acc, reader| async {
let next_schema = fetch_schema(reader);
Schema::try_merge([acc, next_schema?])
@@ -351,16 +352,17 @@ impl ChunkReader for ChunkObjectReader {
fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T> {
self.0
.sync_chunk_reader(start, length)
+ .map_err(DataFusionError::IoError)
.map_err(|e| ParquetError::ArrowError(e.to_string()))
}
}
#[cfg(test)]
mod tests {
+ use crate::datasource::listing::local_unpartitioned_file;
use crate::physical_plan::collect;
use datafusion_storage::object_store::local::{
- local_object_reader, local_object_reader_stream,
local_unpartitioned_file,
- LocalFileSystem,
+ local_object_reader, local_object_reader_stream, LocalFileSystem,
};
use super::*;
diff --git a/datafusion/src/datasource/listing/helpers.rs
b/datafusion/src/datasource/listing/helpers.rs
index 2657474..10b3b48 100644
--- a/datafusion/src/datasource/listing/helpers.rs
+++ b/datafusion/src/datasource/listing/helpers.rs
@@ -29,6 +29,7 @@ use arrow::{
record_batch::RecordBatch,
};
use chrono::{TimeZone, Utc};
+use datafusion_common::DataFusionError;
use futures::{
stream::{self},
StreamExt, TryStreamExt,
@@ -44,10 +45,8 @@ use crate::{
scalar::ScalarValue,
};
-use datafusion_storage::{
- object_store::{ObjectStore, PartitionedFileStream},
- FileMeta, PartitionedFile, SizedFile,
-};
+use super::{PartitionedFile, PartitionedFileStream};
+use datafusion_storage::{object_store::ObjectStore, FileMeta, SizedFile};
const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_";
const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_";
@@ -234,7 +233,11 @@ pub async fn pruned_partition_list(
// store if we switch to a streaming-stlye pruning of the files.
For instance S3 lists
// 1000 items at a time so batches of 1000 would be ideal with S3
as store.
.chunks(1024)
- .map(|v| v.into_iter().collect::<Result<Vec<_>>>())
+ .map(|v| {
+ v.into_iter()
+ .collect::<datafusion_storage::Result<Vec<_>>>()
+ })
+ .map_err(DataFusionError::IoError)
.map(move |metas| paths_to_batch(table_partition_cols,
&stream_path, &metas?))
.try_collect()
.await?;
diff --git a/datafusion/src/datasource/listing/mod.rs
b/datafusion/src/datasource/listing/mod.rs
index 2f1d034..2a2042e 100644
--- a/datafusion/src/datasource/listing/mod.rs
+++ b/datafusion/src/datasource/listing/mod.rs
@@ -21,4 +21,51 @@
mod helpers;
mod table;
+use datafusion_common::ScalarValue;
+use datafusion_storage::{object_store::local, FileMeta, Result, SizedFile};
+use futures::Stream;
+use std::pin::Pin;
+
pub use table::{ListingOptions, ListingTable, ListingTableConfig};
+
+/// Stream of files get listed from object store
+pub type PartitionedFileStream =
+ Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync +
'static>>;
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+ /// Path for the file (e.g. URL, filesystem path, etc)
+ pub file_meta: FileMeta,
+ /// Values of partition columns to be appended to each row
+ pub partition_values: Vec<ScalarValue>,
+ // We may include row group range here for a more fine-grained parallel
execution
+}
+
+impl PartitionedFile {
+ /// Create a simple file without metadata or partition
+ pub fn new(path: String, size: u64) -> Self {
+ Self {
+ file_meta: FileMeta {
+ sized_file: SizedFile { path, size },
+ last_modified: None,
+ },
+ partition_values: vec![],
+ }
+ }
+}
+
+impl std::fmt::Display for PartitionedFile {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "{}", self.file_meta)
+ }
+}
+
+/// Helper method to fetch the file size and date at given path and create a
`FileMeta`
+pub fn local_unpartitioned_file(file: String) -> PartitionedFile {
+ PartitionedFile {
+ file_meta: local::local_unpartitioned_file(file),
+ partition_values: vec![],
+ }
+}
diff --git a/datafusion/src/datasource/listing/table.rs
b/datafusion/src/datasource/listing/table.rs
index 440bbba..42b836c 100644
--- a/datafusion/src/datasource/listing/table.rs
+++ b/datafusion/src/datasource/listing/table.rs
@@ -42,7 +42,8 @@ use crate::datasource::{
get_statistics_with_limit, TableProvider,
};
-use datafusion_storage::{object_store::ObjectStore, PartitionedFile};
+use super::PartitionedFile;
+use datafusion_storage::object_store::ObjectStore;
use super::helpers::{expr_applicable_for_cols, pruned_partition_list,
split_files};
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index 24bc687..2a801ff 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -28,12 +28,12 @@ pub mod object_store_registry;
use futures::Stream;
pub use self::datasource::{TableProvider, TableType};
+use self::listing::PartitionedFile;
pub use self::memory::MemTable;
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
-use datafusion_storage::PartitionedFile;
use futures::StreamExt;
/// Get all files as well as the file level summary statistics (no statistic
for partition columns).
diff --git a/datafusion/src/physical_optimizer/repartition.rs
b/datafusion/src/physical_optimizer/repartition.rs
index b0dbc77..d98fa41 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -238,7 +238,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use super::*;
- use crate::datafusion_storage::PartitionedFile;
+ use crate::datasource::listing::PartitionedFile;
use crate::physical_plan::expressions::{col, PhysicalSortExpr};
use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::filter::FilterExec;
diff --git a/datafusion/src/physical_plan/file_format/avro.rs
b/datafusion/src/physical_plan/file_format/avro.rs
index 3ee23a9..3f6b857 100644
--- a/datafusion/src/physical_plan/file_format/avro.rs
+++ b/datafusion/src/physical_plan/file_format/avro.rs
@@ -174,9 +174,12 @@ impl ExecutionPlan for AvroExec {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
- use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::object_store::local::{
- local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
+ local_object_reader_stream, LocalFileSystem,
+ };
+ use crate::datasource::{
+ file_format::{avro::AvroFormat, FileFormat},
+ listing::local_unpartitioned_file,
};
use crate::scalar::ScalarValue;
use arrow::datatypes::{DataType, Field, Schema};
diff --git a/datafusion/src/physical_plan/file_format/csv.rs
b/datafusion/src/physical_plan/file_format/csv.rs
index f124e70..28466b8 100644
--- a/datafusion/src/physical_plan/file_format/csv.rs
+++ b/datafusion/src/physical_plan/file_format/csv.rs
@@ -221,9 +221,8 @@ pub async fn plan_to_csv(
#[cfg(test)]
mod tests {
use super::*;
- use crate::datafusion_storage::object_store::local::{
- local_unpartitioned_file, LocalFileSystem,
- };
+ use crate::datafusion_storage::object_store::local::LocalFileSystem;
+ use crate::datasource::listing::local_unpartitioned_file;
use crate::prelude::*;
use crate::test_util::aggr_test_schema_with_missing_col;
use crate::{scalar::ScalarValue, test_util::aggr_test_schema};
diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs
b/datafusion/src/physical_plan/file_format/file_stream.rs
index 6d8fa5f..78222ee 100644
--- a/datafusion/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/src/physical_plan/file_format/file_stream.rs
@@ -21,13 +21,14 @@
//! Note: Most traits here need to be marked `Sync + Send` to be
//! compliant with the `SendableRecordBatchStream` trait.
+use crate::datasource::listing::PartitionedFile;
use crate::{physical_plan::RecordBatchStream, scalar::ScalarValue};
use arrow::{
datatypes::SchemaRef,
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
-use datafusion_storage::{object_store::ObjectStore, PartitionedFile};
+use datafusion_storage::object_store::ObjectStore;
use futures::Stream;
use std::{
io::Read,
diff --git a/datafusion/src/physical_plan/file_format/json.rs
b/datafusion/src/physical_plan/file_format/json.rs
index e96ff8b..4eb2fda 100644
--- a/datafusion/src/physical_plan/file_format/json.rs
+++ b/datafusion/src/physical_plan/file_format/json.rs
@@ -194,9 +194,12 @@ mod tests {
use futures::StreamExt;
use crate::datafusion_storage::object_store::local::{
- local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
+ local_object_reader_stream, LocalFileSystem,
+ };
+ use crate::datasource::{
+ file_format::{json::JsonFormat, FileFormat},
+ listing::local_unpartitioned_file,
};
- use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::prelude::NdJsonReadOptions;
use crate::prelude::*;
use tempfile::TempDir;
diff --git a/datafusion/src/physical_plan/file_format/mod.rs
b/datafusion/src/physical_plan/file_format/mod.rs
index 8e150e8..78d1dc2 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -38,12 +38,13 @@ pub use csv::CsvExec;
pub(crate) use json::plan_to_json;
pub use json::NdJsonExec;
+use crate::datasource::listing::PartitionedFile;
use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
};
use arrow::array::{new_null_array, UInt16BufferBuilder};
-use datafusion_storage::{object_store::ObjectStore, PartitionedFile};
+use datafusion_storage::object_store::ObjectStore;
use lazy_static::lazy_static;
use log::info;
use std::{
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs
b/datafusion/src/physical_plan/file_format/parquet.rs
index cda6727..c77e7d8 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -41,7 +41,7 @@ use crate::{
};
use datafusion_common::Column;
use datafusion_expr::Expr;
-use datafusion_storage::{object_store::ObjectStore, PartitionedFile};
+use datafusion_storage::object_store::ObjectStore;
use arrow::{
array::ArrayRef,
@@ -66,6 +66,7 @@ use tokio::{
task,
};
+use crate::datasource::listing::PartitionedFile;
use crate::physical_plan::file_format::SchemaAdapter;
use async_trait::async_trait;
@@ -580,12 +581,13 @@ mod tests {
use crate::{
assert_batches_sorted_eq, assert_contains,
datafusion_storage::{
- object_store::local::{
- local_object_reader_stream, local_unpartitioned_file,
LocalFileSystem,
- },
+ object_store::local::{local_object_reader_stream, LocalFileSystem},
FileMeta, SizedFile,
},
- datasource::file_format::{parquet::ParquetFormat, FileFormat},
+ datasource::{
+ file_format::{parquet::ParquetFormat, FileFormat},
+ listing::local_unpartitioned_file,
+ },
physical_plan::collect,
};
diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs
index b96d981..4bdf6d6 100644
--- a/datafusion/src/test/mod.rs
+++ b/datafusion/src/test/mod.rs
@@ -18,10 +18,10 @@
//! Common unit test utility methods
use crate::arrow::array::UInt32Array;
-use crate::datafusion_storage::{
- object_store::local::local_unpartitioned_file, PartitionedFile,
+use crate::datasource::{
+ listing::{local_unpartitioned_file, PartitionedFile},
+ MemTable, TableProvider,
};
-use crate::datasource::{MemTable, TableProvider};
use crate::error::Result;
use crate::from_slice::FromSlice;
use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder};
diff --git a/datafusion/src/test/object_store.rs
b/datafusion/src/test/object_store.rs
index 9646890..61b4265 100644
--- a/datafusion/src/test/object_store.rs
+++ b/datafusion/src/test/object_store.rs
@@ -22,12 +22,9 @@ use std::{
sync::Arc,
};
-use crate::{
- datafusion_storage::{
- object_store::{FileMetaStream, ListEntryStream, ObjectReader,
ObjectStore},
- FileMeta, SizedFile,
- },
- error::{DataFusionError, Result},
+use crate::datafusion_storage::{
+ object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore},
+ FileMeta, Result, SizedFile,
};
use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt};
@@ -84,14 +81,14 @@ impl ObjectStore for TestObjectStore {
Some((_, size)) if *size == file.size => {
Ok(Arc::new(EmptyObjectReader(*size)))
}
- Some(_) => Err(DataFusionError::IoError(io::Error::new(
+ Some(_) => Err(io::Error::new(
io::ErrorKind::NotFound,
"found in test list but wrong size",
- ))),
- None => Err(DataFusionError::IoError(io::Error::new(
+ )),
+ None => Err(io::Error::new(
io::ErrorKind::NotFound,
"not in provided test list",
- ))),
+ )),
}
}
}
diff --git a/datafusion/tests/path_partition.rs
b/datafusion/tests/path_partition.rs
index 3e46049..8475ea5 100644
--- a/datafusion/tests/path_partition.rs
+++ b/datafusion/tests/path_partition.rs
@@ -33,7 +33,7 @@ use datafusion::{
file_format::{csv::CsvFormat, parquet::ParquetFormat},
listing::{ListingOptions, ListingTable, ListingTableConfig},
},
- error::{DataFusionError, Result},
+ error::Result,
physical_plan::ColumnStatistics,
prelude::SessionContext,
test_util::{self, arrow_test_data, parquet_test_data},
@@ -352,7 +352,10 @@ impl MirroringObjectStore {
#[async_trait]
impl ObjectStore for MirroringObjectStore {
- async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
+ async fn list_file(
+ &self,
+ prefix: &str,
+ ) -> datafusion_storage::Result<FileMetaStream> {
let prefix = prefix.to_owned();
let size = self.file_size;
Ok(Box::pin(
@@ -375,11 +378,14 @@ impl ObjectStore for MirroringObjectStore {
&self,
_prefix: &str,
_delimiter: Option<String>,
- ) -> Result<ListEntryStream> {
+ ) -> datafusion_storage::Result<ListEntryStream> {
unimplemented!()
}
- fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
+ fn file_reader(
+ &self,
+ file: SizedFile,
+ ) -> datafusion_storage::Result<Arc<dyn ObjectReader>> {
assert_eq!(
self.file_size, file.size,
"Requested files should have the same size as the mirrored file"
@@ -389,10 +395,10 @@ impl ObjectStore for MirroringObjectStore {
path: self.mirrored_file.clone(),
size: self.file_size,
})?),
- None => Err(DataFusionError::IoError(io::Error::new(
+ None => Err(io::Error::new(
io::ErrorKind::NotFound,
"not in provided test list",
- ))),
+ )),
}
}
}