This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1186983854 Move `FileCompressionType` out of `common` and into `core`
(#7596)
1186983854 is described below
commit 118698385409ade9f5b0b81bb6ec6e48113049ec
Author: Huaijin <[email protected]>
AuthorDate: Thu Sep 21 04:41:57 2023 +0800
Move `FileCompressionType` out of `common` and into `core` (#7596)
* move FileCompressionType out common and into core
* fix ci
---
datafusion-cli/Cargo.lock | 9 -
datafusion-examples/examples/csv_opener.rs | 2 +-
datafusion-examples/examples/csv_sql.rs | 2 +-
datafusion-examples/examples/json_opener.rs | 2 +-
datafusion/common/Cargo.toml | 17 +-
datafusion/common/src/file_options/file_type.rs | 295 +--------------------
datafusion/common/src/lib.rs | 5 +-
datafusion/core/src/datasource/file_format/csv.rs | 4 +-
.../file_format/file_compression_type.rs} | 111 +-------
datafusion/core/src/datasource/file_format/json.rs | 2 +-
datafusion/core/src/datasource/file_format/mod.rs | 1 +
.../core/src/datasource/file_format/options.rs | 5 +-
.../core/src/datasource/file_format/parquet.rs | 5 +-
.../core/src/datasource/file_format/write.rs | 3 +-
datafusion/core/src/datasource/listing/table.rs | 9 +-
.../core/src/datasource/listing_table_factory.rs | 3 +-
.../core/src/datasource/physical_plan/csv.rs | 2 +-
.../core/src/datasource/physical_plan/json.rs | 3 +-
.../src/physical_optimizer/enforce_distribution.rs | 3 +-
.../replace_with_order_preserving_variants.rs | 2 +-
datafusion/core/src/test/mod.rs | 5 +-
datafusion/proto/src/physical_plan/mod.rs | 2 +-
22 files changed, 50 insertions(+), 442 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 4e3d0c8850..74302f2ddc 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1124,20 +1124,11 @@ version = "31.0.0"
dependencies = [
"arrow",
"arrow-array",
- "async-compression",
- "bytes",
- "bzip2",
"chrono",
- "flate2",
- "futures",
"num_cpus",
"object_store",
"parquet",
"sqlparser",
- "tokio",
- "tokio-util",
- "xz2",
- "zstd",
]
[[package]]
diff --git a/datafusion-examples/examples/csv_opener.rs
b/datafusion-examples/examples/csv_opener.rs
index 0587b515b2..5126666d5e 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -20,6 +20,7 @@ use std::{sync::Arc, vec};
use datafusion::{
assert_batches_eq,
datasource::{
+ file_format::file_compression_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
@@ -28,7 +29,6 @@ use datafusion::{
physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};
-use datafusion_common::FileCompressionType;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
diff --git a/datafusion-examples/examples/csv_sql.rs
b/datafusion-examples/examples/csv_sql.rs
index a9aa24c6aa..851fdcb626 100644
--- a/datafusion-examples/examples/csv_sql.rs
+++ b/datafusion-examples/examples/csv_sql.rs
@@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+use
datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::error::Result;
use datafusion::prelude::*;
-use datafusion_common::FileCompressionType;
/// This example demonstrates executing a simple query against an Arrow data
source (CSV) and
/// fetching results
diff --git a/datafusion-examples/examples/json_opener.rs
b/datafusion-examples/examples/json_opener.rs
index 71bb0ae4fe..74ba6f3852 100644
--- a/datafusion-examples/examples/json_opener.rs
+++ b/datafusion-examples/examples/json_opener.rs
@@ -21,6 +21,7 @@ use arrow_schema::{DataType, Field, Schema};
use datafusion::{
assert_batches_eq,
datasource::{
+ file_format::file_compression_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{FileScanConfig, FileStream, JsonOpener},
@@ -28,7 +29,6 @@ use datafusion::{
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
};
-use datafusion_common::FileCompressionType;
use futures::StreamExt;
use object_store::ObjectStore;
diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index ee06d2103a..a46649c800 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -35,34 +35,19 @@ path = "src/lib.rs"
[features]
avro = ["apache-avro"]
backtrace = []
-compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"]
-default = ["compression", "parquet"]
+default = ["parquet"]
pyarrow = ["pyo3", "arrow/pyarrow"]
[dependencies]
apache-avro = { version = "0.15", default-features = false, features =
["snappy"], optional = true }
arrow = { workspace = true }
arrow-array = { workspace = true }
-async-compression = { version = "0.4.0", features = ["bzip2", "gzip", "xz",
"zstd", "futures-io", "tokio"], optional = true }
-bytes = "1.4"
-bzip2 = { version = "0.4.3", optional = true }
chrono = { workspace = true }
-flate2 = { version = "1.0.24", optional = true }
-futures = "0.3"
num_cpus = "1.13.0"
object_store = { version = "0.7.0", default-features = false, optional = true }
parquet = { workspace = true, optional = true }
pyo3 = { version = "0.19.0", optional = true }
sqlparser = { workspace = true }
-tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread",
"sync", "fs", "parking_lot"] }
-tokio-util = { version = "0.7.4", features = ["io"] }
-xz2 = { version = "0.1", optional = true }
-zstd = { version = "0.12", optional = true }
-
-
-
-
-
[dev-dependencies]
rand = "0.8.4"
diff --git a/datafusion/common/src/file_options/file_type.rs
b/datafusion/common/src/file_options/file_type.rs
index 7d12a267ca..c83da387c2 100644
--- a/datafusion/common/src/file_options/file_type.rs
+++ b/datafusion/common/src/file_options/file_type.rs
@@ -18,38 +18,10 @@
//! File type abstraction
use crate::error::{DataFusionError, Result};
-#[cfg(feature = "compression")]
-use async_compression::tokio::bufread::{
- BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
- GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder,
- XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder,
- ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder,
-};
-
-use crate::parsers::CompressionTypeVariant;
-#[cfg(feature = "compression")]
-use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder,
ZstdEncoder};
-use bytes::Bytes;
-#[cfg(feature = "compression")]
-use bzip2::read::MultiBzDecoder;
-#[cfg(feature = "compression")]
-use flate2::read::MultiGzDecoder;
use core::fmt;
-use futures::stream::BoxStream;
-use futures::StreamExt;
-#[cfg(feature = "compression")]
-use futures::TryStreamExt;
use std::fmt::Display;
use std::str::FromStr;
-use tokio::io::AsyncWrite;
-#[cfg(feature = "compression")]
-use tokio_util::io::{ReaderStream, StreamReader};
-#[cfg(feature = "compression")]
-use xz2::read::XzDecoder;
-#[cfg(feature = "compression")]
-use zstd::Decoder as ZstdDecoder;
-use CompressionTypeVariant::*;
/// The default file extension of arrow files
pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow";
@@ -68,182 +40,6 @@ pub trait GetExt {
fn get_ext(&self) -> String;
}
-/// Readable file compression type
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub struct FileCompressionType {
- variant: CompressionTypeVariant,
-}
-
-impl GetExt for FileCompressionType {
- fn get_ext(&self) -> String {
- match self.variant {
- GZIP => ".gz".to_owned(),
- BZIP2 => ".bz2".to_owned(),
- XZ => ".xz".to_owned(),
- ZSTD => ".zst".to_owned(),
- UNCOMPRESSED => "".to_owned(),
- }
- }
-}
-
-impl From<CompressionTypeVariant> for FileCompressionType {
- fn from(t: CompressionTypeVariant) -> Self {
- Self { variant: t }
- }
-}
-
-impl FromStr for FileCompressionType {
- type Err = DataFusionError;
-
- fn from_str(s: &str) -> Result<Self> {
- let variant = CompressionTypeVariant::from_str(s).map_err(|_| {
- DataFusionError::NotImplemented(format!("Unknown
FileCompressionType: {s}"))
- })?;
- Ok(Self { variant })
- }
-}
-
-/// `FileCompressionType` implementation
-impl FileCompressionType {
- /// Gzip-ed file
- pub const GZIP: Self = Self { variant: GZIP };
-
- /// Bzip2-ed file
- pub const BZIP2: Self = Self { variant: BZIP2 };
-
- /// Xz-ed file (liblzma)
- pub const XZ: Self = Self { variant: XZ };
-
- /// Zstd-ed file
- pub const ZSTD: Self = Self { variant: ZSTD };
-
- /// Uncompressed file
- pub const UNCOMPRESSED: Self = Self {
- variant: UNCOMPRESSED,
- };
-
- /// The file is compressed or not
- pub const fn is_compressed(&self) -> bool {
- self.variant.is_compressed()
- }
-
- /// Given a `Stream`, create a `Stream` which data are compressed with
`FileCompressionType`.
- pub fn convert_to_compress_stream(
- &self,
- s: BoxStream<'static, Result<Bytes>>,
- ) -> Result<BoxStream<'static, Result<Bytes>>> {
- Ok(match self.variant {
- #[cfg(feature = "compression")]
- GZIP =>
ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(feature = "compression")]
- BZIP2 =>
ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(feature = "compression")]
- XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(feature = "compression")]
- ZSTD =>
ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(not(feature = "compression"))]
- GZIP | BZIP2 | XZ | ZSTD => {
- return Err(DataFusionError::NotImplemented(
- "Compression feature is not enabled".to_owned(),
- ))
- }
- UNCOMPRESSED => s.boxed(),
- })
- }
-
- /// Wrap the given `AsyncWrite` so that it performs compressed writes
- /// according to this `FileCompressionType`.
- pub fn convert_async_writer(
- &self,
- w: Box<dyn AsyncWrite + Send + Unpin>,
- ) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
- Ok(match self.variant {
- #[cfg(feature = "compression")]
- GZIP => Box::new(GzipEncoder::new(w)),
- #[cfg(feature = "compression")]
- BZIP2 => Box::new(BzEncoder::new(w)),
- #[cfg(feature = "compression")]
- XZ => Box::new(XzEncoder::new(w)),
- #[cfg(feature = "compression")]
- ZSTD => Box::new(ZstdEncoder::new(w)),
- #[cfg(not(feature = "compression"))]
- GZIP | BZIP2 | XZ | ZSTD => {
- return Err(DataFusionError::NotImplemented(
- "Compression feature is not enabled".to_owned(),
- ))
- }
- UNCOMPRESSED => w,
- })
- }
-
- /// Given a `Stream`, create a `Stream` which data are decompressed with
`FileCompressionType`.
- pub fn convert_stream(
- &self,
- s: BoxStream<'static, Result<Bytes>>,
- ) -> Result<BoxStream<'static, Result<Bytes>>> {
- Ok(match self.variant {
- #[cfg(feature = "compression")]
- GZIP =>
ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(feature = "compression")]
- BZIP2 =>
ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(feature = "compression")]
- XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(feature = "compression")]
- ZSTD =>
ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(not(feature = "compression"))]
- GZIP | BZIP2 | XZ | ZSTD => {
- return Err(DataFusionError::NotImplemented(
- "Compression feature is not enabled".to_owned(),
- ))
- }
- UNCOMPRESSED => s.boxed(),
- })
- }
-
- /// Given a `Read`, create a `Read` which data are decompressed with
`FileCompressionType`.
- pub fn convert_read<T: std::io::Read + Send + 'static>(
- &self,
- r: T,
- ) -> Result<Box<dyn std::io::Read + Send>> {
- Ok(match self.variant {
- #[cfg(feature = "compression")]
- GZIP => Box::new(MultiGzDecoder::new(r)),
- #[cfg(feature = "compression")]
- BZIP2 => Box::new(MultiBzDecoder::new(r)),
- #[cfg(feature = "compression")]
- XZ => Box::new(XzDecoder::new_multi_decoder(r)),
- #[cfg(feature = "compression")]
- ZSTD => match ZstdDecoder::new(r) {
- Ok(decoder) => Box::new(decoder),
- Err(e) => return Err(DataFusionError::External(Box::new(e))),
- },
- #[cfg(not(feature = "compression"))]
- GZIP | BZIP2 | XZ | ZSTD => {
- return Err(DataFusionError::NotImplemented(
- "Compression feature is not enabled".to_owned(),
- ))
- }
- UNCOMPRESSED => Box::new(r),
- })
- }
-}
-
/// Readable file type
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum FileType {
@@ -302,73 +98,12 @@ impl FromStr for FileType {
}
}
-impl FileType {
- /// Given a `FileCompressionType`, return the `FileType`'s extension with
compression suffix
- pub fn get_ext_with_compression(&self, c: FileCompressionType) ->
Result<String> {
- let ext = self.get_ext();
-
- match self {
- FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext,
c.get_ext())),
- FileType::PARQUET | FileType::AVRO | FileType::ARROW => match
c.variant {
- UNCOMPRESSED => Ok(ext),
- _ => Err(DataFusionError::Internal(
- "FileCompressionType can be specified for CSV/JSON
FileType.".into(),
- )),
- },
- }
- }
-}
-
#[cfg(test)]
mod tests {
use crate::error::DataFusionError;
- use crate::file_options::file_type::{FileCompressionType, FileType};
+ use crate::file_options::FileType;
use std::str::FromStr;
- #[test]
- fn get_ext_with_compression() {
- for (file_type, compression, extension) in [
- (FileType::CSV, FileCompressionType::UNCOMPRESSED, ".csv"),
- (FileType::CSV, FileCompressionType::GZIP, ".csv.gz"),
- (FileType::CSV, FileCompressionType::XZ, ".csv.xz"),
- (FileType::CSV, FileCompressionType::BZIP2, ".csv.bz2"),
- (FileType::CSV, FileCompressionType::ZSTD, ".csv.zst"),
- (FileType::JSON, FileCompressionType::UNCOMPRESSED, ".json"),
- (FileType::JSON, FileCompressionType::GZIP, ".json.gz"),
- (FileType::JSON, FileCompressionType::XZ, ".json.xz"),
- (FileType::JSON, FileCompressionType::BZIP2, ".json.bz2"),
- (FileType::JSON, FileCompressionType::ZSTD, ".json.zst"),
- ] {
- assert_eq!(
- file_type.get_ext_with_compression(compression).unwrap(),
- extension
- );
- }
-
- // Cannot specify compression for these file types
- for (file_type, extension) in
- [(FileType::AVRO, ".avro"), (FileType::PARQUET, ".parquet")]
- {
- assert_eq!(
- file_type
-
.get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
- .unwrap(),
- extension
- );
- for compression in [
- FileCompressionType::GZIP,
- FileCompressionType::XZ,
- FileCompressionType::BZIP2,
- FileCompressionType::ZSTD,
- ] {
- assert!(matches!(
- file_type.get_ext_with_compression(compression),
- Err(DataFusionError::Internal(_))
- ));
- }
- }
- }
-
#[test]
fn from_str() {
for (ext, file_type) in [
@@ -388,33 +123,5 @@ mod tests {
FileType::from_str("Unknown"),
Err(DataFusionError::NotImplemented(_))
));
-
- for (ext, compression_type) in [
- ("gz", FileCompressionType::GZIP),
- ("GZ", FileCompressionType::GZIP),
- ("gzip", FileCompressionType::GZIP),
- ("GZIP", FileCompressionType::GZIP),
- ("xz", FileCompressionType::XZ),
- ("XZ", FileCompressionType::XZ),
- ("bz2", FileCompressionType::BZIP2),
- ("BZ2", FileCompressionType::BZIP2),
- ("bzip2", FileCompressionType::BZIP2),
- ("BZIP2", FileCompressionType::BZIP2),
- ("zst", FileCompressionType::ZSTD),
- ("ZST", FileCompressionType::ZSTD),
- ("zstd", FileCompressionType::ZSTD),
- ("ZSTD", FileCompressionType::ZSTD),
- ("", FileCompressionType::UNCOMPRESSED),
- ] {
- assert_eq!(
- FileCompressionType::from_str(ext).unwrap(),
- compression_type
- );
- }
-
- assert!(matches!(
- FileCompressionType::from_str("Unknown"),
- Err(DataFusionError::NotImplemented(_))
- ));
}
}
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 420bcd963c..eeb5b26813 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -46,9 +46,8 @@ pub use error::{
};
pub use file_options::file_type::{
- FileCompressionType, FileType, GetExt, DEFAULT_ARROW_EXTENSION,
- DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION, DEFAULT_JSON_EXTENSION,
- DEFAULT_PARQUET_EXTENSION,
+ FileType, GetExt, DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION,
+ DEFAULT_CSV_EXTENSION, DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
};
pub use file_options::FileTypeWriterOptions;
pub use functional_dependencies::{
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 4578ab5a43..897174659e 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -38,6 +38,7 @@ use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta,
ObjectStore};
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{
create_writer, stateless_serialize_and_write_files, BatchSerializer,
FileWriterMode,
};
@@ -49,7 +50,6 @@ use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
-use datafusion_common::FileCompressionType;
use rand::distributions::{Alphanumeric, DistString};
/// Character Separated Value `FileFormat` implementation.
@@ -599,6 +599,7 @@ mod tests {
use super::*;
use crate::arrow::util::pretty;
use crate::assert_batches_eq;
+ use
crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::test_util::VariableStream;
use crate::datasource::listing::ListingOptions;
use crate::physical_plan::collect;
@@ -609,7 +610,6 @@ mod tests {
use chrono::DateTime;
use datafusion_common::cast::as_string_array;
use datafusion_common::internal_err;
- use datafusion_common::FileCompressionType;
use datafusion_common::FileType;
use datafusion_common::GetExt;
use datafusion_expr::{col, lit};
diff --git a/datafusion/common/src/file_options/file_type.rs
b/datafusion/core/src/datasource/file_format/file_compression_type.rs
similarity index 79%
copy from datafusion/common/src/file_options/file_type.rs
copy to datafusion/core/src/datasource/file_format/file_compression_type.rs
index 7d12a267ca..bd28687670 100644
--- a/datafusion/common/src/file_options/file_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_compression_type.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! File type abstraction
+//! File Compression type abstraction
use crate::error::{DataFusionError, Result};
#[cfg(feature = "compression")]
@@ -26,21 +26,19 @@ use async_compression::tokio::bufread::{
ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder,
};
-use crate::parsers::CompressionTypeVariant;
#[cfg(feature = "compression")]
use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder,
ZstdEncoder};
use bytes::Bytes;
#[cfg(feature = "compression")]
use bzip2::read::MultiBzDecoder;
+use datafusion_common::{parsers::CompressionTypeVariant, FileType, GetExt};
#[cfg(feature = "compression")]
use flate2::read::MultiGzDecoder;
-use core::fmt;
use futures::stream::BoxStream;
use futures::StreamExt;
#[cfg(feature = "compression")]
use futures::TryStreamExt;
-use std::fmt::Display;
use std::str::FromStr;
use tokio::io::AsyncWrite;
#[cfg(feature = "compression")]
@@ -51,23 +49,6 @@ use xz2::read::XzDecoder;
use zstd::Decoder as ZstdDecoder;
use CompressionTypeVariant::*;
-/// The default file extension of arrow files
-pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow";
-/// The default file extension of avro files
-pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
-/// The default file extension of csv files
-pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
-/// The default file extension of json files
-pub const DEFAULT_JSON_EXTENSION: &str = ".json";
-/// The default file extension of parquet files
-pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
-
-/// Define each `FileType`/`FileCompressionType`'s extension
-pub trait GetExt {
- /// File extension getter
- fn get_ext(&self) -> String;
-}
-
/// Readable file compression type
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FileCompressionType {
@@ -244,67 +225,14 @@ impl FileCompressionType {
}
}
-/// Readable file type
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum FileType {
- /// Apache Arrow file
- ARROW,
- /// Apache Avro file
- AVRO,
- /// Apache Parquet file
- PARQUET,
- /// CSV file
- CSV,
- /// JSON file
- JSON,
-}
-
-impl GetExt for FileType {
- fn get_ext(&self) -> String {
- match self {
- FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(),
- FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(),
- FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(),
- FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(),
- FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(),
- }
- }
-}
-
-impl Display for FileType {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- let out = match self {
- FileType::CSV => "csv",
- FileType::JSON => "json",
- FileType::PARQUET => "parquet",
- FileType::AVRO => "avro",
- FileType::ARROW => "arrow",
- };
- write!(f, "{}", out)
- }
-}
-
-impl FromStr for FileType {
- type Err = DataFusionError;
-
- fn from_str(s: &str) -> Result<Self> {
- let s = s.to_uppercase();
- match s.as_str() {
- "ARROW" => Ok(FileType::ARROW),
- "AVRO" => Ok(FileType::AVRO),
- "PARQUET" => Ok(FileType::PARQUET),
- "CSV" => Ok(FileType::CSV),
- "JSON" | "NDJSON" => Ok(FileType::JSON),
- _ => Err(DataFusionError::NotImplemented(format!(
- "Unknown FileType: {s}"
- ))),
- }
- }
+/// Trait for extending the functionality of the `FileType` enum.
+pub trait FileTypeExt {
+ /// Given a `FileCompressionType`, return the `FileType`'s extension with
compression suffix
+ fn get_ext_with_compression(&self, c: FileCompressionType) ->
Result<String>;
}
-impl FileType {
- /// Given a `FileCompressionType`, return the `FileType`'s extension with
compression suffix
- pub fn get_ext_with_compression(&self, c: FileCompressionType) ->
Result<String> {
+impl FileTypeExt for FileType {
+ fn get_ext_with_compression(&self, c: FileCompressionType) ->
Result<String> {
let ext = self.get_ext();
match self {
@@ -321,8 +249,11 @@ impl FileType {
#[cfg(test)]
mod tests {
+ use crate::datasource::file_format::file_compression_type::{
+ FileCompressionType, FileTypeExt,
+ };
use crate::error::DataFusionError;
- use crate::file_options::file_type::{FileCompressionType, FileType};
+ use datafusion_common::file_options::file_type::FileType;
use std::str::FromStr;
#[test]
@@ -371,24 +302,6 @@ mod tests {
#[test]
fn from_str() {
- for (ext, file_type) in [
- ("csv", FileType::CSV),
- ("CSV", FileType::CSV),
- ("json", FileType::JSON),
- ("JSON", FileType::JSON),
- ("avro", FileType::AVRO),
- ("AVRO", FileType::AVRO),
- ("parquet", FileType::PARQUET),
- ("PARQUET", FileType::PARQUET),
- ] {
- assert_eq!(FileType::from_str(ext).unwrap(), file_type);
- }
-
- assert!(matches!(
- FileType::from_str("Unknown"),
- Err(DataFusionError::NotImplemented(_))
- ));
-
for (ext, compression_type) in [
("gz", FileCompressionType::GZIP),
("GZ", FileCompressionType::GZIP),
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 0aa87a9a32..c715317a95 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -51,6 +51,7 @@ use crate::physical_plan::{DisplayAs, DisplayFormatType,
Statistics};
use super::FileFormat;
use super::FileScanConfig;
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{
create_writer, stateless_serialize_and_write_files, BatchSerializer,
FileWriterMode,
};
@@ -60,7 +61,6 @@ use crate::datasource::physical_plan::NdJsonExec;
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::ExecutionPlan;
-use datafusion_common::FileCompressionType;
/// New line delimited JSON `FileFormat` implementation.
#[derive(Debug)]
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index a3ce74eeee..86f265ab94 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -24,6 +24,7 @@ pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
pub mod arrow;
pub mod avro;
pub mod csv;
+pub mod file_compression_type;
pub mod json;
pub mod options;
pub mod parquet;
diff --git a/datafusion/core/src/datasource/file_format/options.rs
b/datafusion/core/src/datasource/file_format/options.rs
index 5289adc6f2..40d9878a01 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -24,6 +24,7 @@ use async_trait::async_trait;
use datafusion_common::{plan_err, DataFusionError};
use crate::datasource::file_format::arrow::ArrowFormat;
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
use crate::datasource::{
@@ -36,8 +37,8 @@ use crate::error::Result;
use crate::execution::context::{SessionConfig, SessionState};
use crate::logical_expr::Expr;
use datafusion_common::{
- FileCompressionType, DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION,
- DEFAULT_CSV_EXTENSION, DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
+ DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION,
+ DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
};
/// Options that control the reading of CSV files.
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 3b15ccd17a..c645dc20da 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -29,13 +29,12 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::task::{JoinHandle, JoinSet};
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
use arrow::datatypes::SchemaRef;
use arrow::datatypes::{Fields, Schema};
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
-use datafusion_common::{
- exec_err, not_impl_err, plan_err, DataFusionError, FileCompressionType,
FileType,
-};
+use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError,
FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use futures::{StreamExt, TryStreamExt};
diff --git a/datafusion/core/src/datasource/file_format/write.rs
b/datafusion/core/src/datasource/file_format/write.rs
index 222fe5b519..42d18eef63 100644
--- a/datafusion/core/src/datasource/file_format/write.rs
+++ b/datafusion/core/src/datasource/file_format/write.rs
@@ -24,12 +24,13 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::FileMeta;
use crate::error::Result;
use crate::physical_plan::SendableRecordBatchStream;
use arrow_array::RecordBatch;
-use datafusion_common::{exec_err, internal_err, DataFusionError,
FileCompressionType};
+use datafusion_common::{exec_err, internal_err, DataFusionError};
use async_trait::async_trait;
use bytes::Bytes;
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index c84cdde08e..1344c417a9 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -31,6 +31,9 @@ use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, LexOrdering,
PhysicalSortExpr};
use futures::{future, stream, StreamExt, TryStreamExt};
+use crate::datasource::file_format::file_compression_type::{
+ FileCompressionType, FileTypeExt,
+};
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::datasource::{
file_format::{
@@ -49,7 +52,7 @@ use crate::{
logical_expr::Expr,
physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
};
-use datafusion_common::{FileCompressionType, FileType};
+use datafusion_common::FileType;
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
@@ -986,7 +989,9 @@ mod tests {
use crate::prelude::*;
use crate::{
assert_batches_eq,
- datasource::file_format::{avro::AvroFormat, parquet::ParquetFormat},
+ datasource::file_format::{
+ avro::AvroFormat, file_compression_type::FileTypeExt,
parquet::ParquetFormat,
+ },
execution::options::ReadOptions,
logical_expr::{col, lit},
test::{columns, object_store::register_test_store},
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index 40aeccf233..9c438a4794 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -30,6 +30,7 @@ use datafusion_expr::CreateExternalTable;
use crate::datasource::file_format::arrow::ArrowFormat;
use crate::datasource::file_format::avro::AvroFormat;
use crate::datasource::file_format::csv::CsvFormat;
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::json::JsonFormat;
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::file_format::FileFormat;
@@ -39,7 +40,7 @@ use crate::datasource::listing::{
use crate::datasource::provider::TableProviderFactory;
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
-use datafusion_common::{FileCompressionType, FileType};
+use datafusion_common::FileType;
use super::listing::ListingTableInsertMode;
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index aedc9198a8..dfc6acdde0 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -17,6 +17,7 @@
//! Execution plan for reading CSV files
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::listing::{FileRange, ListingTableUrl};
use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
@@ -31,7 +32,6 @@ use crate::physical_plan::{
};
use arrow::csv;
use arrow::datatypes::SchemaRef;
-use datafusion_common::FileCompressionType;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering,
OrderingEquivalenceProperties,
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index d5c3a5635a..537855704a 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -16,6 +16,7 @@
// under the License.
//! Execution plan for reading line-delimited JSON files
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::listing::ListingTableUrl;
use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
@@ -28,7 +29,6 @@ use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
Statistics,
};
-use datafusion_common::FileCompressionType;
use datafusion_execution::TaskContext;
use arrow::json::ReaderBuilder;
@@ -318,6 +318,7 @@ mod tests {
use crate::assert_batches_eq;
use crate::dataframe::DataFrameWriteOptions;
+ use crate::datasource::file_format::file_compression_type::FileTypeExt;
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 565f76affa..1279749df1 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1672,6 +1672,7 @@ mod tests {
use std::ops::Deref;
use super::*;
+ use
crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
@@ -1694,7 +1695,7 @@ mod tests {
use crate::physical_plan::sorts::sort::SortExec;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
- use datafusion_common::{FileCompressionType, ScalarValue};
+ use datafusion_common::ScalarValue;
use datafusion_expr::logical_plan::JoinType;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index b406a54105..e223e43cb3 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -281,11 +281,11 @@ mod tests {
use crate::prelude::SessionConfig;
+ use
crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
- use datafusion_common::FileCompressionType;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::{HashJoinExec, PartitionMode};
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index a26be4857d..903542ca3f 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -17,6 +17,9 @@
//! Common unit test utility methods
+use crate::datasource::file_format::file_compression_type::{
+ FileCompressionType, FileTypeExt,
+};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
@@ -34,8 +37,8 @@ use arrow::record_batch::RecordBatch;
use bzip2::write::BzEncoder;
#[cfg(feature = "compression")]
use bzip2::Compression as BzCompression;
+use datafusion_common::FileType;
use datafusion_common::{DataFusionError, Statistics};
-use datafusion_common::{FileCompressionType, FileType};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{Partitioning, PhysicalSortExpr};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 71d5a35569..249abf8e70 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -21,6 +21,7 @@ use std::sync::Arc;
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::SchemaRef;
+use
datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::physical_plan::{AvroExec, CsvExec, ParquetExec};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
@@ -47,7 +48,6 @@ use datafusion::physical_plan::windows::{create_window_expr,
WindowAggExec};
use datafusion::physical_plan::{
udaf, AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
};
-use datafusion_common::FileCompressionType;
use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
use prost::bytes::BufMut;
use prost::Message;