This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1186983854 Move `FileCompressionType` out of `common` and into `core` 
(#7596)
1186983854 is described below

commit 118698385409ade9f5b0b81bb6ec6e48113049ec
Author: Huaijin <[email protected]>
AuthorDate: Thu Sep 21 04:41:57 2023 +0800

    Move `FileCompressionType` out of `common` and into `core` (#7596)
    
    * move FileCompressionType out common and into core
    
    * fix ci
---
 datafusion-cli/Cargo.lock                          |   9 -
 datafusion-examples/examples/csv_opener.rs         |   2 +-
 datafusion-examples/examples/csv_sql.rs            |   2 +-
 datafusion-examples/examples/json_opener.rs        |   2 +-
 datafusion/common/Cargo.toml                       |  17 +-
 datafusion/common/src/file_options/file_type.rs    | 295 +--------------------
 datafusion/common/src/lib.rs                       |   5 +-
 datafusion/core/src/datasource/file_format/csv.rs  |   4 +-
 .../file_format/file_compression_type.rs}          | 111 +-------
 datafusion/core/src/datasource/file_format/json.rs |   2 +-
 datafusion/core/src/datasource/file_format/mod.rs  |   1 +
 .../core/src/datasource/file_format/options.rs     |   5 +-
 .../core/src/datasource/file_format/parquet.rs     |   5 +-
 .../core/src/datasource/file_format/write.rs       |   3 +-
 datafusion/core/src/datasource/listing/table.rs    |   9 +-
 .../core/src/datasource/listing_table_factory.rs   |   3 +-
 .../core/src/datasource/physical_plan/csv.rs       |   2 +-
 .../core/src/datasource/physical_plan/json.rs      |   3 +-
 .../src/physical_optimizer/enforce_distribution.rs |   3 +-
 .../replace_with_order_preserving_variants.rs      |   2 +-
 datafusion/core/src/test/mod.rs                    |   5 +-
 datafusion/proto/src/physical_plan/mod.rs          |   2 +-
 22 files changed, 50 insertions(+), 442 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 4e3d0c8850..74302f2ddc 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1124,20 +1124,11 @@ version = "31.0.0"
 dependencies = [
  "arrow",
  "arrow-array",
- "async-compression",
- "bytes",
- "bzip2",
  "chrono",
- "flate2",
- "futures",
  "num_cpus",
  "object_store",
  "parquet",
  "sqlparser",
- "tokio",
- "tokio-util",
- "xz2",
- "zstd",
 ]
 
 [[package]]
diff --git a/datafusion-examples/examples/csv_opener.rs 
b/datafusion-examples/examples/csv_opener.rs
index 0587b515b2..5126666d5e 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -20,6 +20,7 @@ use std::{sync::Arc, vec};
 use datafusion::{
     assert_batches_eq,
     datasource::{
+        file_format::file_compression_type::FileCompressionType,
         listing::PartitionedFile,
         object_store::ObjectStoreUrl,
         physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
@@ -28,7 +29,6 @@ use datafusion::{
     physical_plan::metrics::ExecutionPlanMetricsSet,
     test_util::aggr_test_schema,
 };
-use datafusion_common::FileCompressionType;
 use futures::StreamExt;
 use object_store::local::LocalFileSystem;
 
diff --git a/datafusion-examples/examples/csv_sql.rs 
b/datafusion-examples/examples/csv_sql.rs
index a9aa24c6aa..851fdcb626 100644
--- a/datafusion-examples/examples/csv_sql.rs
+++ b/datafusion-examples/examples/csv_sql.rs
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use 
datafusion::datasource::file_format::file_compression_type::FileCompressionType;
 use datafusion::error::Result;
 use datafusion::prelude::*;
-use datafusion_common::FileCompressionType;
 
 /// This example demonstrates executing a simple query against an Arrow data 
source (CSV) and
 /// fetching results
diff --git a/datafusion-examples/examples/json_opener.rs 
b/datafusion-examples/examples/json_opener.rs
index 71bb0ae4fe..74ba6f3852 100644
--- a/datafusion-examples/examples/json_opener.rs
+++ b/datafusion-examples/examples/json_opener.rs
@@ -21,6 +21,7 @@ use arrow_schema::{DataType, Field, Schema};
 use datafusion::{
     assert_batches_eq,
     datasource::{
+        file_format::file_compression_type::FileCompressionType,
         listing::PartitionedFile,
         object_store::ObjectStoreUrl,
         physical_plan::{FileScanConfig, FileStream, JsonOpener},
@@ -28,7 +29,6 @@ use datafusion::{
     error::Result,
     physical_plan::metrics::ExecutionPlanMetricsSet,
 };
-use datafusion_common::FileCompressionType;
 use futures::StreamExt;
 use object_store::ObjectStore;
 
diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index ee06d2103a..a46649c800 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -35,34 +35,19 @@ path = "src/lib.rs"
 [features]
 avro = ["apache-avro"]
 backtrace = []
-compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"]
-default = ["compression", "parquet"]
+default = ["parquet"]
 pyarrow = ["pyo3", "arrow/pyarrow"]
 
 [dependencies]
 apache-avro = { version = "0.15", default-features = false, features = 
["snappy"], optional = true }
 arrow = { workspace = true }
 arrow-array = { workspace = true }
-async-compression = { version = "0.4.0", features = ["bzip2", "gzip", "xz", 
"zstd", "futures-io", "tokio"], optional = true }
-bytes = "1.4"
-bzip2 = { version = "0.4.3", optional = true }
 chrono = { workspace = true }
-flate2 = { version = "1.0.24", optional = true }
-futures = "0.3"
 num_cpus = "1.13.0"
 object_store = { version = "0.7.0", default-features = false, optional = true }
 parquet = { workspace = true, optional = true }
 pyo3 = { version = "0.19.0", optional = true }
 sqlparser = { workspace = true }
-tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", 
"sync", "fs", "parking_lot"] }
-tokio-util = { version = "0.7.4", features = ["io"] }
-xz2 = { version = "0.1", optional = true }
-zstd = { version = "0.12", optional = true }
-
-
-
-
-
 
 [dev-dependencies]
 rand = "0.8.4"
diff --git a/datafusion/common/src/file_options/file_type.rs 
b/datafusion/common/src/file_options/file_type.rs
index 7d12a267ca..c83da387c2 100644
--- a/datafusion/common/src/file_options/file_type.rs
+++ b/datafusion/common/src/file_options/file_type.rs
@@ -18,38 +18,10 @@
 //! File type abstraction
 
 use crate::error::{DataFusionError, Result};
-#[cfg(feature = "compression")]
-use async_compression::tokio::bufread::{
-    BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
-    GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder,
-    XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder,
-    ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder,
-};
-
-use crate::parsers::CompressionTypeVariant;
-#[cfg(feature = "compression")]
-use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, 
ZstdEncoder};
-use bytes::Bytes;
-#[cfg(feature = "compression")]
-use bzip2::read::MultiBzDecoder;
-#[cfg(feature = "compression")]
-use flate2::read::MultiGzDecoder;
 
 use core::fmt;
-use futures::stream::BoxStream;
-use futures::StreamExt;
-#[cfg(feature = "compression")]
-use futures::TryStreamExt;
 use std::fmt::Display;
 use std::str::FromStr;
-use tokio::io::AsyncWrite;
-#[cfg(feature = "compression")]
-use tokio_util::io::{ReaderStream, StreamReader};
-#[cfg(feature = "compression")]
-use xz2::read::XzDecoder;
-#[cfg(feature = "compression")]
-use zstd::Decoder as ZstdDecoder;
-use CompressionTypeVariant::*;
 
 /// The default file extension of arrow files
 pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow";
@@ -68,182 +40,6 @@ pub trait GetExt {
     fn get_ext(&self) -> String;
 }
 
-/// Readable file compression type
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub struct FileCompressionType {
-    variant: CompressionTypeVariant,
-}
-
-impl GetExt for FileCompressionType {
-    fn get_ext(&self) -> String {
-        match self.variant {
-            GZIP => ".gz".to_owned(),
-            BZIP2 => ".bz2".to_owned(),
-            XZ => ".xz".to_owned(),
-            ZSTD => ".zst".to_owned(),
-            UNCOMPRESSED => "".to_owned(),
-        }
-    }
-}
-
-impl From<CompressionTypeVariant> for FileCompressionType {
-    fn from(t: CompressionTypeVariant) -> Self {
-        Self { variant: t }
-    }
-}
-
-impl FromStr for FileCompressionType {
-    type Err = DataFusionError;
-
-    fn from_str(s: &str) -> Result<Self> {
-        let variant = CompressionTypeVariant::from_str(s).map_err(|_| {
-            DataFusionError::NotImplemented(format!("Unknown 
FileCompressionType: {s}"))
-        })?;
-        Ok(Self { variant })
-    }
-}
-
-/// `FileCompressionType` implementation
-impl FileCompressionType {
-    /// Gzip-ed file
-    pub const GZIP: Self = Self { variant: GZIP };
-
-    /// Bzip2-ed file
-    pub const BZIP2: Self = Self { variant: BZIP2 };
-
-    /// Xz-ed file (liblzma)
-    pub const XZ: Self = Self { variant: XZ };
-
-    /// Zstd-ed file
-    pub const ZSTD: Self = Self { variant: ZSTD };
-
-    /// Uncompressed file
-    pub const UNCOMPRESSED: Self = Self {
-        variant: UNCOMPRESSED,
-    };
-
-    /// The file is compressed or not
-    pub const fn is_compressed(&self) -> bool {
-        self.variant.is_compressed()
-    }
-
-    /// Given a `Stream`, create a `Stream` which data are compressed with 
`FileCompressionType`.
-    pub fn convert_to_compress_stream(
-        &self,
-        s: BoxStream<'static, Result<Bytes>>,
-    ) -> Result<BoxStream<'static, Result<Bytes>>> {
-        Ok(match self.variant {
-            #[cfg(feature = "compression")]
-            GZIP => 
ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
-                .map_err(DataFusionError::from)
-                .boxed(),
-            #[cfg(feature = "compression")]
-            BZIP2 => 
ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s)))
-                .map_err(DataFusionError::from)
-                .boxed(),
-            #[cfg(feature = "compression")]
-            XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s)))
-                .map_err(DataFusionError::from)
-                .boxed(),
-            #[cfg(feature = "compression")]
-            ZSTD => 
ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s)))
-                .map_err(DataFusionError::from)
-                .boxed(),
-            #[cfg(not(feature = "compression"))]
-            GZIP | BZIP2 | XZ | ZSTD => {
-                return Err(DataFusionError::NotImplemented(
-                    "Compression feature is not enabled".to_owned(),
-                ))
-            }
-            UNCOMPRESSED => s.boxed(),
-        })
-    }
-
-    /// Wrap the given `AsyncWrite` so that it performs compressed writes
-    /// according to this `FileCompressionType`.
-    pub fn convert_async_writer(
-        &self,
-        w: Box<dyn AsyncWrite + Send + Unpin>,
-    ) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
-        Ok(match self.variant {
-            #[cfg(feature = "compression")]
-            GZIP => Box::new(GzipEncoder::new(w)),
-            #[cfg(feature = "compression")]
-            BZIP2 => Box::new(BzEncoder::new(w)),
-            #[cfg(feature = "compression")]
-            XZ => Box::new(XzEncoder::new(w)),
-            #[cfg(feature = "compression")]
-            ZSTD => Box::new(ZstdEncoder::new(w)),
-            #[cfg(not(feature = "compression"))]
-            GZIP | BZIP2 | XZ | ZSTD => {
-                return Err(DataFusionError::NotImplemented(
-                    "Compression feature is not enabled".to_owned(),
-                ))
-            }
-            UNCOMPRESSED => w,
-        })
-    }
-
-    /// Given a `Stream`, create a `Stream` which data are decompressed with 
`FileCompressionType`.
-    pub fn convert_stream(
-        &self,
-        s: BoxStream<'static, Result<Bytes>>,
-    ) -> Result<BoxStream<'static, Result<Bytes>>> {
-        Ok(match self.variant {
-            #[cfg(feature = "compression")]
-            GZIP => 
ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
-                .map_err(DataFusionError::from)
-                .boxed(),
-            #[cfg(feature = "compression")]
-            BZIP2 => 
ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
-                .map_err(DataFusionError::from)
-                .boxed(),
-            #[cfg(feature = "compression")]
-            XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
-                .map_err(DataFusionError::from)
-                .boxed(),
-            #[cfg(feature = "compression")]
-            ZSTD => 
ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s)))
-                .map_err(DataFusionError::from)
-                .boxed(),
-            #[cfg(not(feature = "compression"))]
-            GZIP | BZIP2 | XZ | ZSTD => {
-                return Err(DataFusionError::NotImplemented(
-                    "Compression feature is not enabled".to_owned(),
-                ))
-            }
-            UNCOMPRESSED => s.boxed(),
-        })
-    }
-
-    /// Given a `Read`, create a `Read` which data are decompressed with 
`FileCompressionType`.
-    pub fn convert_read<T: std::io::Read + Send + 'static>(
-        &self,
-        r: T,
-    ) -> Result<Box<dyn std::io::Read + Send>> {
-        Ok(match self.variant {
-            #[cfg(feature = "compression")]
-            GZIP => Box::new(MultiGzDecoder::new(r)),
-            #[cfg(feature = "compression")]
-            BZIP2 => Box::new(MultiBzDecoder::new(r)),
-            #[cfg(feature = "compression")]
-            XZ => Box::new(XzDecoder::new_multi_decoder(r)),
-            #[cfg(feature = "compression")]
-            ZSTD => match ZstdDecoder::new(r) {
-                Ok(decoder) => Box::new(decoder),
-                Err(e) => return Err(DataFusionError::External(Box::new(e))),
-            },
-            #[cfg(not(feature = "compression"))]
-            GZIP | BZIP2 | XZ | ZSTD => {
-                return Err(DataFusionError::NotImplemented(
-                    "Compression feature is not enabled".to_owned(),
-                ))
-            }
-            UNCOMPRESSED => Box::new(r),
-        })
-    }
-}
-
 /// Readable file type
 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub enum FileType {
@@ -302,73 +98,12 @@ impl FromStr for FileType {
     }
 }
 
-impl FileType {
-    /// Given a `FileCompressionType`, return the `FileType`'s extension with 
compression suffix
-    pub fn get_ext_with_compression(&self, c: FileCompressionType) -> 
Result<String> {
-        let ext = self.get_ext();
-
-        match self {
-            FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, 
c.get_ext())),
-            FileType::PARQUET | FileType::AVRO | FileType::ARROW => match 
c.variant {
-                UNCOMPRESSED => Ok(ext),
-                _ => Err(DataFusionError::Internal(
-                    "FileCompressionType can be specified for CSV/JSON 
FileType.".into(),
-                )),
-            },
-        }
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use crate::error::DataFusionError;
-    use crate::file_options::file_type::{FileCompressionType, FileType};
+    use crate::file_options::FileType;
     use std::str::FromStr;
 
-    #[test]
-    fn get_ext_with_compression() {
-        for (file_type, compression, extension) in [
-            (FileType::CSV, FileCompressionType::UNCOMPRESSED, ".csv"),
-            (FileType::CSV, FileCompressionType::GZIP, ".csv.gz"),
-            (FileType::CSV, FileCompressionType::XZ, ".csv.xz"),
-            (FileType::CSV, FileCompressionType::BZIP2, ".csv.bz2"),
-            (FileType::CSV, FileCompressionType::ZSTD, ".csv.zst"),
-            (FileType::JSON, FileCompressionType::UNCOMPRESSED, ".json"),
-            (FileType::JSON, FileCompressionType::GZIP, ".json.gz"),
-            (FileType::JSON, FileCompressionType::XZ, ".json.xz"),
-            (FileType::JSON, FileCompressionType::BZIP2, ".json.bz2"),
-            (FileType::JSON, FileCompressionType::ZSTD, ".json.zst"),
-        ] {
-            assert_eq!(
-                file_type.get_ext_with_compression(compression).unwrap(),
-                extension
-            );
-        }
-
-        // Cannot specify compression for these file types
-        for (file_type, extension) in
-            [(FileType::AVRO, ".avro"), (FileType::PARQUET, ".parquet")]
-        {
-            assert_eq!(
-                file_type
-                    
.get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
-                    .unwrap(),
-                extension
-            );
-            for compression in [
-                FileCompressionType::GZIP,
-                FileCompressionType::XZ,
-                FileCompressionType::BZIP2,
-                FileCompressionType::ZSTD,
-            ] {
-                assert!(matches!(
-                    file_type.get_ext_with_compression(compression),
-                    Err(DataFusionError::Internal(_))
-                ));
-            }
-        }
-    }
-
     #[test]
     fn from_str() {
         for (ext, file_type) in [
@@ -388,33 +123,5 @@ mod tests {
             FileType::from_str("Unknown"),
             Err(DataFusionError::NotImplemented(_))
         ));
-
-        for (ext, compression_type) in [
-            ("gz", FileCompressionType::GZIP),
-            ("GZ", FileCompressionType::GZIP),
-            ("gzip", FileCompressionType::GZIP),
-            ("GZIP", FileCompressionType::GZIP),
-            ("xz", FileCompressionType::XZ),
-            ("XZ", FileCompressionType::XZ),
-            ("bz2", FileCompressionType::BZIP2),
-            ("BZ2", FileCompressionType::BZIP2),
-            ("bzip2", FileCompressionType::BZIP2),
-            ("BZIP2", FileCompressionType::BZIP2),
-            ("zst", FileCompressionType::ZSTD),
-            ("ZST", FileCompressionType::ZSTD),
-            ("zstd", FileCompressionType::ZSTD),
-            ("ZSTD", FileCompressionType::ZSTD),
-            ("", FileCompressionType::UNCOMPRESSED),
-        ] {
-            assert_eq!(
-                FileCompressionType::from_str(ext).unwrap(),
-                compression_type
-            );
-        }
-
-        assert!(matches!(
-            FileCompressionType::from_str("Unknown"),
-            Err(DataFusionError::NotImplemented(_))
-        ));
     }
 }
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 420bcd963c..eeb5b26813 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -46,9 +46,8 @@ pub use error::{
 };
 
 pub use file_options::file_type::{
-    FileCompressionType, FileType, GetExt, DEFAULT_ARROW_EXTENSION,
-    DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION, DEFAULT_JSON_EXTENSION,
-    DEFAULT_PARQUET_EXTENSION,
+    FileType, GetExt, DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION,
+    DEFAULT_CSV_EXTENSION, DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
 };
 pub use file_options::FileTypeWriterOptions;
 pub use functional_dependencies::{
diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 4578ab5a43..897174659e 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -38,6 +38,7 @@ use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
 use object_store::{delimited::newline_delimited_stream, ObjectMeta, 
ObjectStore};
 
 use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::file_format::write::{
     create_writer, stateless_serialize_and_write_files, BatchSerializer, 
FileWriterMode,
 };
@@ -49,7 +50,6 @@ use crate::execution::context::SessionState;
 use crate::physical_plan::insert::{DataSink, FileSinkExec};
 use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
 use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
-use datafusion_common::FileCompressionType;
 use rand::distributions::{Alphanumeric, DistString};
 
 /// Character Separated Value `FileFormat` implementation.
@@ -599,6 +599,7 @@ mod tests {
     use super::*;
     use crate::arrow::util::pretty;
     use crate::assert_batches_eq;
+    use 
crate::datasource::file_format::file_compression_type::FileCompressionType;
     use crate::datasource::file_format::test_util::VariableStream;
     use crate::datasource::listing::ListingOptions;
     use crate::physical_plan::collect;
@@ -609,7 +610,6 @@ mod tests {
     use chrono::DateTime;
     use datafusion_common::cast::as_string_array;
     use datafusion_common::internal_err;
-    use datafusion_common::FileCompressionType;
     use datafusion_common::FileType;
     use datafusion_common::GetExt;
     use datafusion_expr::{col, lit};
diff --git a/datafusion/common/src/file_options/file_type.rs 
b/datafusion/core/src/datasource/file_format/file_compression_type.rs
similarity index 79%
copy from datafusion/common/src/file_options/file_type.rs
copy to datafusion/core/src/datasource/file_format/file_compression_type.rs
index 7d12a267ca..bd28687670 100644
--- a/datafusion/common/src/file_options/file_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_compression_type.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! File type abstraction
+//! File Compression type abstraction
 
 use crate::error::{DataFusionError, Result};
 #[cfg(feature = "compression")]
@@ -26,21 +26,19 @@ use async_compression::tokio::bufread::{
     ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder,
 };
 
-use crate::parsers::CompressionTypeVariant;
 #[cfg(feature = "compression")]
 use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, 
ZstdEncoder};
 use bytes::Bytes;
 #[cfg(feature = "compression")]
 use bzip2::read::MultiBzDecoder;
+use datafusion_common::{parsers::CompressionTypeVariant, FileType, GetExt};
 #[cfg(feature = "compression")]
 use flate2::read::MultiGzDecoder;
 
-use core::fmt;
 use futures::stream::BoxStream;
 use futures::StreamExt;
 #[cfg(feature = "compression")]
 use futures::TryStreamExt;
-use std::fmt::Display;
 use std::str::FromStr;
 use tokio::io::AsyncWrite;
 #[cfg(feature = "compression")]
@@ -51,23 +49,6 @@ use xz2::read::XzDecoder;
 use zstd::Decoder as ZstdDecoder;
 use CompressionTypeVariant::*;
 
-/// The default file extension of arrow files
-pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow";
-/// The default file extension of avro files
-pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
-/// The default file extension of csv files
-pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
-/// The default file extension of json files
-pub const DEFAULT_JSON_EXTENSION: &str = ".json";
-/// The default file extension of parquet files
-pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
-
-/// Define each `FileType`/`FileCompressionType`'s extension
-pub trait GetExt {
-    /// File extension getter
-    fn get_ext(&self) -> String;
-}
-
 /// Readable file compression type
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub struct FileCompressionType {
@@ -244,67 +225,14 @@ impl FileCompressionType {
     }
 }
 
-/// Readable file type
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum FileType {
-    /// Apache Arrow file
-    ARROW,
-    /// Apache Avro file
-    AVRO,
-    /// Apache Parquet file
-    PARQUET,
-    /// CSV file
-    CSV,
-    /// JSON file
-    JSON,
-}
-
-impl GetExt for FileType {
-    fn get_ext(&self) -> String {
-        match self {
-            FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(),
-            FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(),
-            FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(),
-            FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(),
-            FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(),
-        }
-    }
-}
-
-impl Display for FileType {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        let out = match self {
-            FileType::CSV => "csv",
-            FileType::JSON => "json",
-            FileType::PARQUET => "parquet",
-            FileType::AVRO => "avro",
-            FileType::ARROW => "arrow",
-        };
-        write!(f, "{}", out)
-    }
-}
-
-impl FromStr for FileType {
-    type Err = DataFusionError;
-
-    fn from_str(s: &str) -> Result<Self> {
-        let s = s.to_uppercase();
-        match s.as_str() {
-            "ARROW" => Ok(FileType::ARROW),
-            "AVRO" => Ok(FileType::AVRO),
-            "PARQUET" => Ok(FileType::PARQUET),
-            "CSV" => Ok(FileType::CSV),
-            "JSON" | "NDJSON" => Ok(FileType::JSON),
-            _ => Err(DataFusionError::NotImplemented(format!(
-                "Unknown FileType: {s}"
-            ))),
-        }
-    }
+/// Trait for extending the functionality of the `FileType` enum.
+pub trait FileTypeExt {
+    /// Given a `FileCompressionType`, return the `FileType`'s extension with 
compression suffix
+    fn get_ext_with_compression(&self, c: FileCompressionType) -> 
Result<String>;
 }
 
-impl FileType {
-    /// Given a `FileCompressionType`, return the `FileType`'s extension with 
compression suffix
-    pub fn get_ext_with_compression(&self, c: FileCompressionType) -> 
Result<String> {
+impl FileTypeExt for FileType {
+    fn get_ext_with_compression(&self, c: FileCompressionType) -> 
Result<String> {
         let ext = self.get_ext();
 
         match self {
@@ -321,8 +249,11 @@ impl FileType {
 
 #[cfg(test)]
 mod tests {
+    use crate::datasource::file_format::file_compression_type::{
+        FileCompressionType, FileTypeExt,
+    };
     use crate::error::DataFusionError;
-    use crate::file_options::file_type::{FileCompressionType, FileType};
+    use datafusion_common::file_options::file_type::FileType;
     use std::str::FromStr;
 
     #[test]
@@ -371,24 +302,6 @@ mod tests {
 
     #[test]
     fn from_str() {
-        for (ext, file_type) in [
-            ("csv", FileType::CSV),
-            ("CSV", FileType::CSV),
-            ("json", FileType::JSON),
-            ("JSON", FileType::JSON),
-            ("avro", FileType::AVRO),
-            ("AVRO", FileType::AVRO),
-            ("parquet", FileType::PARQUET),
-            ("PARQUET", FileType::PARQUET),
-        ] {
-            assert_eq!(FileType::from_str(ext).unwrap(), file_type);
-        }
-
-        assert!(matches!(
-            FileType::from_str("Unknown"),
-            Err(DataFusionError::NotImplemented(_))
-        ));
-
         for (ext, compression_type) in [
             ("gz", FileCompressionType::GZIP),
             ("GZ", FileCompressionType::GZIP),
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 0aa87a9a32..c715317a95 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -51,6 +51,7 @@ use crate::physical_plan::{DisplayAs, DisplayFormatType, 
Statistics};
 
 use super::FileFormat;
 use super::FileScanConfig;
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::file_format::write::{
     create_writer, stateless_serialize_and_write_files, BatchSerializer, 
FileWriterMode,
 };
@@ -60,7 +61,6 @@ use crate::datasource::physical_plan::NdJsonExec;
 use crate::error::Result;
 use crate::execution::context::SessionState;
 use crate::physical_plan::ExecutionPlan;
-use datafusion_common::FileCompressionType;
 
 /// New line delimited JSON `FileFormat` implementation.
 #[derive(Debug)]
diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index a3ce74eeee..86f265ab94 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -24,6 +24,7 @@ pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
 pub mod arrow;
 pub mod avro;
 pub mod csv;
+pub mod file_compression_type;
 pub mod json;
 pub mod options;
 pub mod parquet;
diff --git a/datafusion/core/src/datasource/file_format/options.rs 
b/datafusion/core/src/datasource/file_format/options.rs
index 5289adc6f2..40d9878a01 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -24,6 +24,7 @@ use async_trait::async_trait;
 use datafusion_common::{plan_err, DataFusionError};
 
 use crate::datasource::file_format::arrow::ArrowFormat;
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
 use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
 use crate::datasource::{
@@ -36,8 +37,8 @@ use crate::error::Result;
 use crate::execution::context::{SessionConfig, SessionState};
 use crate::logical_expr::Expr;
 use datafusion_common::{
-    FileCompressionType, DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION,
-    DEFAULT_CSV_EXTENSION, DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
+    DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION,
+    DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
 };
 
 /// Options that control the reading of CSV files.
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 3b15ccd17a..c645dc20da 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -29,13 +29,12 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
 use tokio::task::{JoinHandle, JoinSet};
 
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use arrow::datatypes::SchemaRef;
 use arrow::datatypes::{Fields, Schema};
 use async_trait::async_trait;
 use bytes::{BufMut, BytesMut};
-use datafusion_common::{
-    exec_err, not_impl_err, plan_err, DataFusionError, FileCompressionType, 
FileType,
-};
+use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, 
FileType};
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::PhysicalExpr;
 use futures::{StreamExt, TryStreamExt};
diff --git a/datafusion/core/src/datasource/file_format/write.rs 
b/datafusion/core/src/datasource/file_format/write.rs
index 222fe5b519..42d18eef63 100644
--- a/datafusion/core/src/datasource/file_format/write.rs
+++ b/datafusion/core/src/datasource/file_format/write.rs
@@ -24,12 +24,13 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::physical_plan::FileMeta;
 use crate::error::Result;
 use crate::physical_plan::SendableRecordBatchStream;
 
 use arrow_array::RecordBatch;
-use datafusion_common::{exec_err, internal_err, DataFusionError, 
FileCompressionType};
+use datafusion_common::{exec_err, internal_err, DataFusionError};
 
 use async_trait::async_trait;
 use bytes::Bytes;
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index c84cdde08e..1344c417a9 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -31,6 +31,9 @@ use datafusion_optimizer::utils::conjunction;
 use datafusion_physical_expr::{create_physical_expr, LexOrdering, 
PhysicalSortExpr};
 use futures::{future, stream, StreamExt, TryStreamExt};
 
+use crate::datasource::file_format::file_compression_type::{
+    FileCompressionType, FileTypeExt,
+};
 use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
 use crate::datasource::{
     file_format::{
@@ -49,7 +52,7 @@ use crate::{
     logical_expr::Expr,
     physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
 };
-use datafusion_common::{FileCompressionType, FileType};
+use datafusion_common::FileType;
 use datafusion_execution::cache::cache_manager::FileStatisticsCache;
 use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
 
@@ -986,7 +989,9 @@ mod tests {
     use crate::prelude::*;
     use crate::{
         assert_batches_eq,
-        datasource::file_format::{avro::AvroFormat, parquet::ParquetFormat},
+        datasource::file_format::{
+            avro::AvroFormat, file_compression_type::FileTypeExt, 
parquet::ParquetFormat,
+        },
         execution::options::ReadOptions,
         logical_expr::{col, lit},
         test::{columns, object_store::register_test_store},
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs 
b/datafusion/core/src/datasource/listing_table_factory.rs
index 40aeccf233..9c438a4794 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -30,6 +30,7 @@ use datafusion_expr::CreateExternalTable;
 use crate::datasource::file_format::arrow::ArrowFormat;
 use crate::datasource::file_format::avro::AvroFormat;
 use crate::datasource::file_format::csv::CsvFormat;
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::file_format::json::JsonFormat;
 use crate::datasource::file_format::parquet::ParquetFormat;
 use crate::datasource::file_format::FileFormat;
@@ -39,7 +40,7 @@ use crate::datasource::listing::{
 use crate::datasource::provider::TableProviderFactory;
 use crate::datasource::TableProvider;
 use crate::execution::context::SessionState;
-use datafusion_common::{FileCompressionType, FileType};
+use datafusion_common::FileType;
 
 use super::listing::ListingTableInsertMode;
 
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index aedc9198a8..dfc6acdde0 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -17,6 +17,7 @@
 
 //! Execution plan for reading CSV files
 
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::listing::{FileRange, ListingTableUrl};
 use crate::datasource::physical_plan::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
@@ -31,7 +32,6 @@ use crate::physical_plan::{
 };
 use arrow::csv;
 use arrow::datatypes::SchemaRef;
-use datafusion_common::FileCompressionType;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{
     ordering_equivalence_properties_helper, LexOrdering, 
OrderingEquivalenceProperties,
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index d5c3a5635a..537855704a 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 //! Execution plan for reading line-delimited JSON files
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::listing::ListingTableUrl;
 use crate::datasource::physical_plan::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
@@ -28,7 +29,6 @@ use crate::physical_plan::{
     DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
SendableRecordBatchStream,
     Statistics,
 };
-use datafusion_common::FileCompressionType;
 use datafusion_execution::TaskContext;
 
 use arrow::json::ReaderBuilder;
@@ -318,6 +318,7 @@ mod tests {
 
     use crate::assert_batches_eq;
     use crate::dataframe::DataFrameWriteOptions;
+    use crate::datasource::file_format::file_compression_type::FileTypeExt;
     use crate::datasource::file_format::{json::JsonFormat, FileFormat};
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 565f76affa..1279749df1 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1672,6 +1672,7 @@ mod tests {
     use std::ops::Deref;
 
     use super::*;
+    use 
crate::datasource::file_format::file_compression_type::FileCompressionType;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
@@ -1694,7 +1695,7 @@ mod tests {
     use crate::physical_plan::sorts::sort::SortExec;
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-    use datafusion_common::{FileCompressionType, ScalarValue};
+    use datafusion_common::ScalarValue;
     use datafusion_expr::logical_plan::JoinType;
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index b406a54105..e223e43cb3 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -281,11 +281,11 @@ mod tests {
 
     use crate::prelude::SessionConfig;
 
+    use 
crate::datasource::file_format::file_compression_type::FileCompressionType;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
     use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-    use datafusion_common::FileCompressionType;
 
     use crate::physical_plan::filter::FilterExec;
     use crate::physical_plan::joins::{HashJoinExec, PartitionMode};
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index a26be4857d..903542ca3f 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -17,6 +17,9 @@
 
 //! Common unit test utility methods
 
+use crate::datasource::file_format::file_compression_type::{
+    FileCompressionType, FileTypeExt,
+};
 use crate::datasource::listing::PartitionedFile;
 use crate::datasource::object_store::ObjectStoreUrl;
 use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
@@ -34,8 +37,8 @@ use arrow::record_batch::RecordBatch;
 use bzip2::write::BzEncoder;
 #[cfg(feature = "compression")]
 use bzip2::Compression as BzCompression;
+use datafusion_common::FileType;
 use datafusion_common::{DataFusionError, Statistics};
-use datafusion_common::{FileCompressionType, FileType};
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_physical_expr::{Partitioning, PhysicalSortExpr};
 use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 71d5a35569..249abf8e70 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -21,6 +21,7 @@ use std::sync::Arc;
 
 use datafusion::arrow::compute::SortOptions;
 use datafusion::arrow::datatypes::SchemaRef;
+use 
datafusion::datasource::file_format::file_compression_type::FileCompressionType;
 use datafusion::datasource::physical_plan::{AvroExec, CsvExec, ParquetExec};
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::execution::FunctionRegistry;
@@ -47,7 +48,6 @@ use datafusion::physical_plan::windows::{create_window_expr, 
WindowAggExec};
 use datafusion::physical_plan::{
     udaf, AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
 };
-use datafusion_common::FileCompressionType;
 use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
 use prost::bytes::BufMut;
 use prost::Message;


Reply via email to