This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b8a3a78f8 Compressed CSV/JSON support (#3642)
b8a3a78f8 is described below

commit b8a3a78f833fae8faace8d7542a1fb3d7a497b6a
Author: Rito Takeuchi <[email protected]>
AuthorDate: Wed Oct 12 00:27:12 2022 +0900

    Compressed CSV/JSON support (#3642)
    
    * Compression text support
    
    * Fix the path joining issue on Windows test
    
    * Debug code for Windows CI
    
    * Utilize `std::path::Path`, instead of `url::Url`
---
 .../examples/parquet_sql_multiple_files.rs         |   7 +-
 datafusion/common/src/error.rs                     |   6 +
 datafusion/core/Cargo.toml                         |   5 +
 datafusion/core/src/datasource/file_format/csv.rs  |  27 +-
 .../core/src/datasource/file_format/file_type.rs   | 331 +++++++++++++++++++++
 datafusion/core/src/datasource/file_format/json.rs |  25 +-
 datafusion/core/src/datasource/file_format/mod.rs  |   1 +
 datafusion/core/src/datasource/listing/table.rs    |  57 ++--
 datafusion/core/src/execution/context.rs           |  59 ++--
 datafusion/core/src/execution/options.rs           |  55 +++-
 .../core/src/physical_plan/file_format/csv.rs      | 156 ++++++++--
 .../core/src/physical_plan/file_format/json.rs     | 192 +++++++++---
 datafusion/core/src/test/mod.rs                    | 134 ++++++---
 datafusion/expr/src/logical_plan/plan.rs           |   2 +
 datafusion/proto/proto/datafusion.proto            |   1 +
 datafusion/proto/src/logical_plan.rs               |   3 +
 datafusion/sql/src/parser.rs                       |  53 ++++
 datafusion/sql/src/planner.rs                      |  39 +++
 18 files changed, 976 insertions(+), 177 deletions(-)

diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs 
b/datafusion-examples/examples/parquet_sql_multiple_files.rs
index 2b7a5f6fe..bf5bd94a5 100644
--- a/datafusion-examples/examples/parquet_sql_multiple_files.rs
+++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs
@@ -15,9 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion::datasource::file_format::parquet::{
-    ParquetFormat, DEFAULT_PARQUET_EXTENSION,
-};
+use datafusion::datasource::file_format::file_type::{FileType, GetExt};
+use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::listing::ListingOptions;
 use datafusion::error::Result;
 use datafusion::prelude::*;
@@ -35,7 +34,7 @@ async fn main() -> Result<()> {
     // Configure listing options
     let file_format = ParquetFormat::default().with_enable_pruning(true);
     let listing_options = ListingOptions {
-        file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
+        file_extension: FileType::PARQUET.get_ext(),
         format: Arc::new(file_format),
         table_partition_cols: vec![],
         collect_stat: true,
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index 9157e04a2..c2d3e389c 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -318,6 +318,12 @@ impl Display for DataFusionError {
 
 impl error::Error for DataFusionError {}
 
+impl From<DataFusionError> for io::Error {
+    fn from(e: DataFusionError) -> Self {
+        io::Error::new(io::ErrorKind::Other, e)
+    }
+}
+
 #[cfg(test)]
 mod test {
     use crate::error::DataFusionError;
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index f7a9ce4e9..44a5cbe1a 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -57,8 +57,10 @@ unicode_expressions = 
["datafusion-physical-expr/regex_expressions", "datafusion
 ahash = { version = "0.8", default-features = false, features = 
["runtime-rng"] }
 apache-avro = { version = "0.14", optional = true }
 arrow = { version = "24.0.0", features = ["prettyprint"] }
+async-compression = { version = "0.3.14", features = ["bzip2", "gzip", 
"futures-io", "tokio"] }
 async-trait = "0.1.41"
 bytes = "1.1"
+bzip2 = "0.4.3"
 chrono = { version = "0.4", default-features = false }
 datafusion-common = { path = "../common", version = "13.0.0", features = 
["parquet", "object_store"] }
 datafusion-expr = { path = "../expr", version = "13.0.0" }
@@ -67,6 +69,7 @@ datafusion-optimizer = { path = "../optimizer", version = 
"13.0.0" }
 datafusion-physical-expr = { path = "../physical-expr", version = "13.0.0" }
 datafusion-row = { path = "../row", version = "13.0.0" }
 datafusion-sql = { path = "../sql", version = "13.0.0" }
+flate2 = "1.0.24"
 futures = "0.3"
 glob = "0.3.0"
 hashbrown = { version = "0.12", features = ["raw"] }
@@ -90,6 +93,7 @@ sqlparser = "0.25"
 tempfile = "3"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", 
"sync", "fs", "parking_lot"] }
 tokio-stream = "0.1"
+tokio-util = { version = "0.7.4", features = ["io"] }
 url = "2.2"
 uuid = { version = "1.0", features = ["v4"] }
 
@@ -102,6 +106,7 @@ ctor = "0.1.22"
 doc-comment = "0.3"
 env_logger = "0.9"
 fuzz-utils = { path = "fuzz-utils" }
+rstest = "0.15.0"
 
 [[bench]]
 harness = false
diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 2e6994f4b..6a99e35b8 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -18,16 +18,21 @@
 //! CSV format abstractions
 
 use std::any::Any;
+
 use std::sync::Arc;
 
 use arrow::datatypes::Schema;
 use arrow::{self, datatypes::SchemaRef};
 use async_trait::async_trait;
+use bytes::Buf;
+
 use datafusion_common::DataFusionError;
+
 use futures::TryFutureExt;
 use object_store::{ObjectMeta, ObjectStore};
 
 use super::FileFormat;
+use crate::datasource::file_format::file_type::FileCompressionType;
 use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
 use crate::error::Result;
 use crate::logical_plan::Expr;
@@ -43,6 +48,7 @@ pub struct CsvFormat {
     has_header: bool,
     delimiter: u8,
     schema_infer_max_rec: Option<usize>,
+    file_compression_type: FileCompressionType,
 }
 
 impl Default for CsvFormat {
@@ -51,6 +57,7 @@ impl Default for CsvFormat {
             schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
             has_header: true,
             delimiter: b',',
+            file_compression_type: FileCompressionType::UNCOMPRESSED,
         }
     }
 }
@@ -82,6 +89,16 @@ impl CsvFormat {
         self
     }
 
+    /// Set a `FileCompressionType` of CSV
+    /// - defaults to `FileCompressionType::UNCOMPRESSED`
+    pub fn with_file_compression_type(
+        mut self,
+        file_compression_type: FileCompressionType,
+    ) -> Self {
+        self.file_compression_type = file_compression_type;
+        self
+    }
+
     /// The delimiter character.
     pub fn delimiter(&self) -> u8 {
         self.delimiter
@@ -110,8 +127,9 @@ impl FileFormat for CsvFormat {
                 .await
                 .map_err(|e| DataFusionError::External(Box::new(e)))?;
 
+            let decoder = 
self.file_compression_type.convert_read(data.reader());
             let (schema, records_read) = 
arrow::csv::reader::infer_reader_schema(
-                &mut data.as_ref(),
+                decoder,
                 self.delimiter,
                 Some(records_to_read),
                 self.has_header,
@@ -144,7 +162,12 @@ impl FileFormat for CsvFormat {
         conf: FileScanConfig,
         _filters: &[Expr],
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let exec = CsvExec::new(conf, self.has_header, self.delimiter);
+        let exec = CsvExec::new(
+            conf,
+            self.has_header,
+            self.delimiter,
+            self.file_compression_type.to_owned(),
+        );
         Ok(Arc::new(exec))
     }
 }
diff --git a/datafusion/core/src/datasource/file_format/file_type.rs 
b/datafusion/core/src/datasource/file_format/file_type.rs
new file mode 100644
index 000000000..f08a21ca1
--- /dev/null
+++ b/datafusion/core/src/datasource/file_format/file_type.rs
@@ -0,0 +1,331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! File type abstraction
+
+use crate::error::{DataFusionError, Result};
+use std::io::Error;
+
+use async_compression::tokio::bufread::{
+    BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder,
+};
+use bzip2::read::BzDecoder;
+
+use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
+use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
+use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
+use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
+use bytes::Bytes;
+use flate2::read::GzDecoder;
+use futures::{Stream, TryStreamExt};
+use std::str::FromStr;
+use tokio_util::io::{ReaderStream, StreamReader};
+
+/// Define each `FileType`/`FileCompressionType`'s extension
+pub trait GetExt {
+    /// File extension getter
+    fn get_ext(&self) -> String;
+}
+
+/// Readable file compression type
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum FileCompressionType {
+    /// Gzip-ed file
+    GZIP,
+    /// Bzip2-ed file
+    BZIP2,
+    /// Uncompressed file
+    UNCOMPRESSED,
+}
+
+impl GetExt for FileCompressionType {
+    fn get_ext(&self) -> String {
+        match self {
+            FileCompressionType::GZIP => ".gz".to_owned(),
+            FileCompressionType::BZIP2 => ".bz2".to_owned(),
+            FileCompressionType::UNCOMPRESSED => "".to_owned(),
+        }
+    }
+}
+
+impl FromStr for FileCompressionType {
+    type Err = DataFusionError;
+
+    fn from_str(s: &str) -> Result<Self> {
+        let s = s.to_uppercase();
+        match s.as_str() {
+            "GZIP" | "GZ" => Ok(FileCompressionType::GZIP),
+            "BZIP2" | "BZ2" => Ok(FileCompressionType::BZIP2),
+            "" => Ok(FileCompressionType::UNCOMPRESSED),
+            _ => Err(DataFusionError::NotImplemented(format!(
+                "Unknown FileCompressionType: {}",
+                s
+            ))),
+        }
+    }
+}
+
+/// `FileCompressionType` implementation
+impl FileCompressionType {
+    /// Given a `Stream`, create a `Stream` which data are decompressed with 
`FileCompressionType`.
+    pub fn convert_stream<T: Stream<Item = Result<Bytes>> + Unpin + Send + 
'static>(
+        &self,
+        s: T,
+    ) -> Box<dyn Stream<Item = Result<Bytes>> + Send + Unpin> {
+        let err_converter = |e: Error| match e
+            .get_ref()
+            .and_then(|e| e.downcast_ref::<DataFusionError>())
+        {
+            Some(_) => {
+                *(e.into_inner()
+                    .unwrap()
+                    .downcast::<DataFusionError>()
+                    .unwrap())
+            }
+            None => Into::<DataFusionError>::into(e),
+        };
+
+        match self {
+            FileCompressionType::GZIP => Box::new(
+                ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
+                    .map_err(err_converter),
+            ),
+            FileCompressionType::BZIP2 => Box::new(
+                ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
+                    .map_err(err_converter),
+            ),
+            FileCompressionType::UNCOMPRESSED => Box::new(s),
+        }
+    }
+
+    /// Given a `Read`, create a `Read` which data are decompressed with 
`FileCompressionType`.
+    pub fn convert_read<T: std::io::Read + Send + 'static>(
+        &self,
+        r: T,
+    ) -> Box<dyn std::io::Read + Send> {
+        match self {
+            FileCompressionType::GZIP => Box::new(GzDecoder::new(r)),
+            FileCompressionType::BZIP2 => Box::new(BzDecoder::new(r)),
+            FileCompressionType::UNCOMPRESSED => Box::new(r),
+        }
+    }
+}
+
+/// Readable file type
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum FileType {
+    /// Apache Avro file
+    AVRO,
+    /// Apache Parquet file
+    PARQUET,
+    /// CSV file
+    CSV,
+    /// JSON file
+    JSON,
+}
+
+impl GetExt for FileType {
+    fn get_ext(&self) -> String {
+        match self {
+            FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(),
+            FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(),
+            FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(),
+            FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(),
+        }
+    }
+}
+
+impl FromStr for FileType {
+    type Err = DataFusionError;
+
+    fn from_str(s: &str) -> Result<Self> {
+        let s = s.to_uppercase();
+        match s.as_str() {
+            "AVRO" => Ok(FileType::AVRO),
+            "PARQUET" => Ok(FileType::PARQUET),
+            "CSV" => Ok(FileType::CSV),
+            "JSON" => Ok(FileType::JSON),
+            _ => Err(DataFusionError::NotImplemented(format!(
+                "Unknown FileType: {}",
+                s
+            ))),
+        }
+    }
+}
+
+impl FileType {
+    /// Given a `FileCompressionType`, return the `FileType`'s extension with 
compression suffix
+    pub fn get_ext_with_compression(&self, c: FileCompressionType) -> 
Result<String> {
+        let ext = self.get_ext();
+
+        match self {
+            FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, 
c.get_ext())),
+            FileType::PARQUET | FileType::AVRO => match c {
+                FileCompressionType::UNCOMPRESSED => Ok(ext),
+                _ => Err(DataFusionError::Internal(
+                    "FileCompressionType can be specified for CSV/JSON 
FileType.".into(),
+                )),
+            },
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::datasource::file_format::file_type::{FileCompressionType, 
FileType};
+    use crate::error::DataFusionError;
+    use std::str::FromStr;
+
+    #[test]
+    fn get_ext_with_compression() {
+        let file_type = FileType::CSV;
+        assert_eq!(
+            file_type
+                .get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
+                .unwrap(),
+            ".csv"
+        );
+        assert_eq!(
+            file_type
+                .get_ext_with_compression(FileCompressionType::GZIP)
+                .unwrap(),
+            ".csv.gz"
+        );
+        assert_eq!(
+            file_type
+                .get_ext_with_compression(FileCompressionType::BZIP2)
+                .unwrap(),
+            ".csv.bz2"
+        );
+
+        let file_type = FileType::JSON;
+        assert_eq!(
+            file_type
+                .get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
+                .unwrap(),
+            ".json"
+        );
+        assert_eq!(
+            file_type
+                .get_ext_with_compression(FileCompressionType::GZIP)
+                .unwrap(),
+            ".json.gz"
+        );
+        assert_eq!(
+            file_type
+                .get_ext_with_compression(FileCompressionType::BZIP2)
+                .unwrap(),
+            ".json.bz2"
+        );
+
+        let file_type = FileType::AVRO;
+        assert_eq!(
+            file_type
+                .get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
+                .unwrap(),
+            ".avro"
+        );
+        assert!(matches!(
+            file_type.get_ext_with_compression(FileCompressionType::GZIP),
+            Err(DataFusionError::Internal(_))
+        ));
+        assert!(matches!(
+            file_type.get_ext_with_compression(FileCompressionType::BZIP2),
+            Err(DataFusionError::Internal(_))
+        ));
+
+        let file_type = FileType::PARQUET;
+        assert_eq!(
+            file_type
+                .get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
+                .unwrap(),
+            ".parquet"
+        );
+        assert!(matches!(
+            file_type.get_ext_with_compression(FileCompressionType::GZIP),
+            Err(DataFusionError::Internal(_))
+        ));
+        assert!(matches!(
+            file_type.get_ext_with_compression(FileCompressionType::BZIP2),
+            Err(DataFusionError::Internal(_))
+        ));
+    }
+
+    #[test]
+    fn from_str() {
+        assert_eq!(FileType::from_str("csv").unwrap(), FileType::CSV);
+        assert_eq!(FileType::from_str("CSV").unwrap(), FileType::CSV);
+
+        assert_eq!(FileType::from_str("json").unwrap(), FileType::JSON);
+        assert_eq!(FileType::from_str("JSON").unwrap(), FileType::JSON);
+
+        assert_eq!(FileType::from_str("avro").unwrap(), FileType::AVRO);
+        assert_eq!(FileType::from_str("AVRO").unwrap(), FileType::AVRO);
+
+        assert_eq!(FileType::from_str("parquet").unwrap(), FileType::PARQUET);
+        assert_eq!(FileType::from_str("PARQUET").unwrap(), FileType::PARQUET);
+
+        assert!(matches!(
+            FileType::from_str("Unknown"),
+            Err(DataFusionError::NotImplemented(_))
+        ));
+
+        assert_eq!(
+            FileCompressionType::from_str("gz").unwrap(),
+            FileCompressionType::GZIP
+        );
+        assert_eq!(
+            FileCompressionType::from_str("GZ").unwrap(),
+            FileCompressionType::GZIP
+        );
+        assert_eq!(
+            FileCompressionType::from_str("gzip").unwrap(),
+            FileCompressionType::GZIP
+        );
+        assert_eq!(
+            FileCompressionType::from_str("GZIP").unwrap(),
+            FileCompressionType::GZIP
+        );
+
+        assert_eq!(
+            FileCompressionType::from_str("bz2").unwrap(),
+            FileCompressionType::BZIP2
+        );
+        assert_eq!(
+            FileCompressionType::from_str("BZ2").unwrap(),
+            FileCompressionType::BZIP2
+        );
+        assert_eq!(
+            FileCompressionType::from_str("bzip2").unwrap(),
+            FileCompressionType::BZIP2
+        );
+        assert_eq!(
+            FileCompressionType::from_str("BZIP2").unwrap(),
+            FileCompressionType::BZIP2
+        );
+
+        assert_eq!(
+            FileCompressionType::from_str("").unwrap(),
+            FileCompressionType::UNCOMPRESSED
+        );
+
+        assert!(matches!(
+            FileCompressionType::from_str("Unknown"),
+            Err(DataFusionError::NotImplemented(_))
+        ));
+    }
+}
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 9a889ab4c..02a684e85 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -18,6 +18,7 @@
 //! Line delimited JSON format abstractions
 
 use std::any::Any;
+
 use std::io::BufReader;
 use std::sync::Arc;
 
@@ -26,10 +27,13 @@ use arrow::datatypes::SchemaRef;
 use arrow::json::reader::infer_json_schema_from_iterator;
 use arrow::json::reader::ValueIter;
 use async_trait::async_trait;
+use bytes::Buf;
+
 use object_store::{GetResult, ObjectMeta, ObjectStore};
 
 use super::FileFormat;
 use super::FileScanConfig;
+use crate::datasource::file_format::file_type::FileCompressionType;
 use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
 use crate::error::Result;
 use crate::logical_plan::Expr;
@@ -43,12 +47,14 @@ pub const DEFAULT_JSON_EXTENSION: &str = ".json";
 #[derive(Debug)]
 pub struct JsonFormat {
     schema_infer_max_rec: Option<usize>,
+    file_compression_type: FileCompressionType,
 }
 
 impl Default for JsonFormat {
     fn default() -> Self {
         Self {
             schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
+            file_compression_type: FileCompressionType::UNCOMPRESSED,
         }
     }
 }
@@ -60,6 +66,16 @@ impl JsonFormat {
         self.schema_infer_max_rec = max_rec;
         self
     }
+
+    /// Set a `FileCompressionType` of JSON
+    /// - defaults to `FileCompressionType::UNCOMPRESSED`
+    pub fn with_file_compression_type(
+        mut self,
+        file_compression_type: FileCompressionType,
+    ) -> Self {
+        self.file_compression_type = file_compression_type;
+        self
+    }
 }
 
 #[async_trait]
@@ -75,6 +91,7 @@ impl FileFormat for JsonFormat {
     ) -> Result<SchemaRef> {
         let mut schemas = Vec::new();
         let mut records_to_read = 
self.schema_infer_max_rec.unwrap_or(usize::MAX);
+        let file_compression_type = self.file_compression_type.to_owned();
         for object in objects {
             let mut take_while = || {
                 let should_take = records_to_read > 0;
@@ -86,13 +103,15 @@ impl FileFormat for JsonFormat {
 
             let schema = match store.get(&object.location).await? {
                 GetResult::File(file, _) => {
-                    let mut reader = BufReader::new(file);
+                    let decoder = file_compression_type.convert_read(file);
+                    let mut reader = BufReader::new(decoder);
                     let iter = ValueIter::new(&mut reader, None);
                     infer_json_schema_from_iterator(iter.take_while(|_| 
take_while()))?
                 }
                 r @ GetResult::Stream(_) => {
                     let data = r.bytes().await?;
-                    let mut reader = BufReader::new(data.as_ref());
+                    let decoder = 
file_compression_type.convert_read(data.reader());
+                    let mut reader = BufReader::new(decoder);
                     let iter = ValueIter::new(&mut reader, None);
                     infer_json_schema_from_iterator(iter.take_while(|_| 
take_while()))?
                 }
@@ -122,7 +141,7 @@ impl FileFormat for JsonFormat {
         conf: FileScanConfig,
         _filters: &[Expr],
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let exec = NdJsonExec::new(conf);
+        let exec = NdJsonExec::new(conf, 
self.file_compression_type.to_owned());
         Ok(Arc::new(exec))
     }
 }
diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index b16525c7a..7b9421bc7 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -22,6 +22,7 @@ pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
 
 pub mod avro;
 pub mod csv;
+pub mod file_type;
 pub mod json;
 pub mod parquet;
 
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index f6c617f0c..72f5b9827 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -18,6 +18,7 @@
 //! The table implementation.
 
 use ahash::HashMap;
+use std::str::FromStr;
 use std::{any::Any, sync::Arc};
 
 use arrow::datatypes::{Field, Schema, SchemaRef};
@@ -27,6 +28,7 @@ use object_store::path::Path;
 use object_store::ObjectMeta;
 use parking_lot::RwLock;
 
+use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
 use crate::datasource::{
     file_format::{
         avro::AvroFormat, csv::CsvFormat, json::JsonFormat, 
parquet::ParquetFormat,
@@ -102,17 +104,39 @@ impl ListingTableConfig {
         }
     }
 
-    fn infer_format(suffix: &str) -> Result<Arc<dyn FileFormat>> {
-        match suffix {
-            "avro" => Ok(Arc::new(AvroFormat::default())),
-            "csv" => Ok(Arc::new(CsvFormat::default())),
-            "json" => Ok(Arc::new(JsonFormat::default())),
-            "parquet" => Ok(Arc::new(ParquetFormat::default())),
-            _ => Err(DataFusionError::Internal(format!(
-                "Unable to infer file type from suffix {}",
-                suffix
-            ))),
+    fn infer_format(path: &str) -> Result<(Arc<dyn FileFormat>, String)> {
+        let err_msg = format!("Unable to infer file type from path: {}", path);
+
+        let mut exts = path.rsplit('.');
+
+        let mut splitted = exts.next().unwrap_or("");
+
+        let file_compression_type = FileCompressionType::from_str(splitted)
+            .unwrap_or(FileCompressionType::UNCOMPRESSED);
+
+        if file_compression_type != FileCompressionType::UNCOMPRESSED {
+            splitted = exts.next().unwrap_or("");
         }
+
+        let file_type = FileType::from_str(splitted)
+            .map_err(|_| DataFusionError::Internal(err_msg.to_owned()))?;
+
+        let ext = file_type
+            .get_ext_with_compression(file_compression_type.to_owned())
+            .map_err(|_| DataFusionError::Internal(err_msg))?;
+
+        let file_format: Arc<dyn FileFormat> = match file_type {
+            FileType::AVRO => Arc::new(AvroFormat::default()),
+            FileType::CSV => Arc::new(
+                
CsvFormat::default().with_file_compression_type(file_compression_type),
+            ),
+            FileType::JSON => Arc::new(
+                
JsonFormat::default().with_file_compression_type(file_compression_type),
+            ),
+            FileType::PARQUET => Arc::new(ParquetFormat::default()),
+        };
+
+        Ok((file_format, ext))
     }
 
     /// Infer `ListingOptions` based on `table_path` suffix.
@@ -130,16 +154,13 @@ impl ListingTableConfig {
             .await
             .ok_or_else(|| DataFusionError::Internal("No files for 
table".into()))??;
 
-        let file_type = 
file.location.as_ref().rsplit('.').next().ok_or_else(|| {
-            DataFusionError::Internal("Unable to infer file suffix".into())
-        })?;
-
-        let format = ListingTableConfig::infer_format(file_type)?;
+        let (format, file_extension) =
+            ListingTableConfig::infer_format(file.location.as_ref())?;
 
         let listing_options = ListingOptions {
             format,
             collect_stat: true,
-            file_extension: file_type.to_string(),
+            file_extension,
             target_partitions: ctx.config.target_partitions,
             table_partition_cols: vec![],
         };
@@ -474,7 +495,7 @@ impl ListingTable {
 
 #[cfg(test)]
 mod tests {
-    use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
+    use crate::datasource::file_format::file_type::GetExt;
     use crate::prelude::SessionContext;
     use crate::{
         datasource::file_format::{avro::AvroFormat, parquet::ParquetFormat},
@@ -537,7 +558,7 @@ mod tests {
         register_test_store(&ctx, &[(&path, 100)]);
 
         let opt = ListingOptions {
-            file_extension: DEFAULT_AVRO_EXTENSION.to_owned(),
+            file_extension: FileType::AVRO.get_ext(),
             format: Arc::new(AvroFormat {}),
             table_partition_cols: vec![String::from("p1")],
             target_partitions: 4,
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index d0baa035e..35670f21f 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -24,10 +24,7 @@ use crate::{
     datasource::listing::{ListingOptions, ListingTable},
     datasource::{
         file_format::{
-            avro::{AvroFormat, DEFAULT_AVRO_EXTENSION},
-            csv::{CsvFormat, DEFAULT_CSV_EXTENSION},
-            json::{JsonFormat, DEFAULT_JSON_EXTENSION},
-            parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
+            avro::AvroFormat, csv::CsvFormat, json::JsonFormat, 
parquet::ParquetFormat,
             FileFormat,
         },
         MemTable, ViewTable,
@@ -42,6 +39,7 @@ use crate::{
 pub use datafusion_physical_expr::execution_props::ExecutionProps;
 use datafusion_physical_expr::var_provider::is_system_variables;
 use parking_lot::RwLock;
+use std::str::FromStr;
 use std::sync::Arc;
 use std::{
     any::{Any, TypeId},
@@ -81,6 +79,7 @@ use crate::config::{
     OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
 };
 use crate::datasource::datasource::TableProviderFactory;
+use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
 use crate::execution::runtime_env::RuntimeEnv;
 use crate::logical_expr::Explain;
 use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, 
plan_to_parquet};
@@ -100,6 +99,7 @@ use datafusion_sql::{
     planner::{ContextProvider, SqlToRel},
 };
 use parquet::file::properties::WriterProperties;
+
 use uuid::Uuid;
 
 use super::options::{
@@ -448,31 +448,38 @@ impl SessionContext {
         &self,
         cmd: &CreateExternalTable,
     ) -> Result<Arc<DataFrame>> {
-        let (file_format, file_extension) = match cmd.file_type.as_str() {
-            "CSV" => (
-                Arc::new(
-                    CsvFormat::default()
-                        .with_has_header(cmd.has_header)
-                        .with_delimiter(cmd.delimiter as u8),
-                ) as Arc<dyn FileFormat>,
-                DEFAULT_CSV_EXTENSION,
-            ),
-            "PARQUET" => (
-                Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
-                DEFAULT_PARQUET_EXTENSION,
-            ),
-            "AVRO" => (
-                Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
-                DEFAULT_AVRO_EXTENSION,
-            ),
-            "JSON" => (
-                Arc::new(JsonFormat::default()) as Arc<dyn FileFormat>,
-                DEFAULT_JSON_EXTENSION,
-            ),
-            _ => Err(DataFusionError::Execution(
+        let file_compression_type =
+            match 
FileCompressionType::from_str(cmd.file_compression_type.as_str()) {
+                Ok(t) => t,
+                Err(_) => Err(DataFusionError::Execution(
+                    "Only known FileCompressionTypes can be 
ListingTables!".to_string(),
+                ))?,
+            };
+
+        let file_type = match FileType::from_str(cmd.file_type.as_str()) {
+            Ok(t) => t,
+            Err(_) => Err(DataFusionError::Execution(
                 "Only known FileTypes can be ListingTables!".to_string(),
             ))?,
         };
+
+        let file_extension =
+            
file_type.get_ext_with_compression(file_compression_type.to_owned())?;
+
+        let file_format: Arc<dyn FileFormat> = match file_type {
+            FileType::CSV => Arc::new(
+                CsvFormat::default()
+                    .with_has_header(cmd.has_header)
+                    .with_delimiter(cmd.delimiter as u8)
+                    .with_file_compression_type(file_compression_type),
+            ),
+            FileType::PARQUET => Arc::new(ParquetFormat::default()),
+            FileType::AVRO => Arc::new(AvroFormat::default()),
+            FileType::JSON => Arc::new(
+                
JsonFormat::default().with_file_compression_type(file_compression_type),
+            ),
+        };
+
         let table = self.table(cmd.name.as_str());
         match (cmd.if_not_exists, table) {
             (true, Ok(_)) => self.return_empty_dataframe(),
diff --git a/datafusion/core/src/execution/options.rs 
b/datafusion/core/src/execution/options.rs
index e296d18b6..9ddd3f1d6 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -21,13 +21,15 @@ use std::sync::Arc;
 
 use arrow::datatypes::{Schema, SchemaRef};
 
+use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
+use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
+use crate::datasource::file_format::file_type::FileCompressionType;
+use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
+use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
 use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
 use crate::datasource::{
     file_format::{
-        avro::{AvroFormat, DEFAULT_AVRO_EXTENSION},
-        csv::{CsvFormat, DEFAULT_CSV_EXTENSION},
-        json::{JsonFormat, DEFAULT_JSON_EXTENSION},
-        parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
+        avro::AvroFormat, csv::CsvFormat, json::JsonFormat, 
parquet::ParquetFormat,
     },
     listing::ListingOptions,
 };
@@ -48,10 +50,13 @@ pub struct CsvReadOptions<'a> {
     /// Max number of rows to read from CSV files for schema inference if 
needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
     pub schema_infer_max_records: usize,
     /// File extension; only files with this extension are selected for data 
input.
-    /// Defaults to DEFAULT_CSV_EXTENSION.
+    /// Defaults to `FileType::CSV.get_ext().as_str()`.
     pub file_extension: &'a str,
     /// Partition Columns
     pub table_partition_cols: Vec<String>,
+
+    /// File compression type
+    pub file_compression_type: FileCompressionType,
 }
 
 impl<'a> Default for CsvReadOptions<'a> {
@@ -70,6 +75,7 @@ impl<'a> CsvReadOptions<'a> {
             delimiter: b',',
             file_extension: DEFAULT_CSV_EXTENSION,
             table_partition_cols: vec![],
+            file_compression_type: FileCompressionType::UNCOMPRESSED,
         }
     }
 
@@ -117,12 +123,22 @@ impl<'a> CsvReadOptions<'a> {
         self
     }
 
+    /// Configure file compression type
+    pub fn file_compression_type(
+        mut self,
+        file_compression_type: FileCompressionType,
+    ) -> Self {
+        self.file_compression_type = file_compression_type;
+        self
+    }
+
     /// Helper to convert these user facing options to `ListingTable` options
     pub fn to_listing_options(&self, target_partitions: usize) -> 
ListingOptions {
         let file_format = CsvFormat::default()
             .with_has_header(self.has_header)
             .with_delimiter(self.delimiter)
-            .with_schema_infer_max_rec(Some(self.schema_infer_max_records));
+            .with_schema_infer_max_rec(Some(self.schema_infer_max_records))
+            .with_file_compression_type(self.file_compression_type.to_owned());
 
         ListingOptions {
             format: Arc::new(file_format),
@@ -154,6 +170,7 @@ pub struct ParquetReadOptions<'a> {
 impl<'a> Default for ParquetReadOptions<'a> {
     fn default() -> Self {
         let format_default = ParquetFormat::default();
+
         Self {
             file_extension: DEFAULT_PARQUET_EXTENSION,
             table_partition_cols: vec![],
@@ -207,7 +224,7 @@ pub struct AvroReadOptions<'a> {
     pub schema: Option<SchemaRef>,
 
     /// File extension; only files with this extension are selected for data 
input.
-    /// Defaults to DEFAULT_AVRO_EXTENSION.
+    /// Defaults to `FileType::AVRO.get_ext().as_str()`.
     pub file_extension: &'a str,
     /// Partition Columns
     pub table_partition_cols: Vec<String>,
@@ -254,10 +271,13 @@ pub struct NdJsonReadOptions<'a> {
     pub schema_infer_max_records: usize,
 
     /// File extension; only files with this extension are selected for data 
input.
-    /// Defaults to DEFAULT_JSON_EXTENSION.
+    /// Defaults to `FileType::JSON.get_ext().as_str()`.
     pub file_extension: &'a str,
     /// Partition Columns
     pub table_partition_cols: Vec<String>,
+
+    /// File compression type
+    pub file_compression_type: FileCompressionType,
 }
 
 impl<'a> Default for NdJsonReadOptions<'a> {
@@ -267,6 +287,7 @@ impl<'a> Default for NdJsonReadOptions<'a> {
             schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
             file_extension: DEFAULT_JSON_EXTENSION,
             table_partition_cols: vec![],
+            file_compression_type: FileCompressionType::UNCOMPRESSED,
         }
     }
 }
@@ -278,9 +299,25 @@ impl<'a> NdJsonReadOptions<'a> {
         self
     }
 
+    /// Specify file_extension
+    pub fn file_extension(mut self, file_extension: &'a str) -> Self {
+        self.file_extension = file_extension;
+        self
+    }
+
+    /// Specify file_compression_type
+    pub fn file_compression_type(
+        mut self,
+        file_compression_type: FileCompressionType,
+    ) -> Self {
+        self.file_compression_type = file_compression_type;
+        self
+    }
+
     /// Helper to convert these user facing options to `ListingTable` options
     pub fn to_listing_options(&self, target_partitions: usize) -> 
ListingOptions {
-        let file_format = JsonFormat::default();
+        let file_format = JsonFormat::default()
+            .with_file_compression_type(self.file_compression_type.to_owned());
         ListingOptions {
             format: Arc::new(file_format),
             collect_stat: false,
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs 
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 885bea870..03e1e8059 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -17,6 +17,7 @@
 
 //! Execution plan for reading CSV files
 
+use crate::datasource::file_format::file_type::FileCompressionType;
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::{SessionState, TaskContext};
 use crate::physical_plan::expressions::PhysicalSortExpr;
@@ -31,7 +32,9 @@ use crate::physical_plan::{
 };
 use arrow::csv;
 use arrow::datatypes::SchemaRef;
+
 use bytes::Buf;
+
 use futures::{StreamExt, TryStreamExt};
 use object_store::{GetResult, ObjectStore};
 use std::any::Any;
@@ -52,11 +55,17 @@ pub struct CsvExec {
     delimiter: u8,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    file_compression_type: FileCompressionType,
 }
 
 impl CsvExec {
     /// Create a new CSV reader execution plan provided base and specific 
configurations
-    pub fn new(base_config: FileScanConfig, has_header: bool, delimiter: u8) 
-> Self {
+    pub fn new(
+        base_config: FileScanConfig,
+        has_header: bool,
+        delimiter: u8,
+        file_compression_type: FileCompressionType,
+    ) -> Self {
         let (projected_schema, projected_statistics) = base_config.project();
 
         Self {
@@ -66,6 +75,7 @@ impl CsvExec {
             has_header,
             delimiter,
             metrics: ExecutionPlanMetricsSet::new(),
+            file_compression_type,
         }
     }
 
@@ -132,7 +142,10 @@ impl ExecutionPlan for CsvExec {
             delimiter: self.delimiter,
         });
 
-        let opener = CsvOpener { config };
+        let opener = CsvOpener {
+            config,
+            file_compression_type: self.file_compression_type.to_owned(),
+        };
         let stream = FileStream::new(
             &self.base_config,
             partition,
@@ -194,6 +207,7 @@ impl CsvConfig {
 
 struct CsvOpener {
     config: Arc<CsvConfig>,
+    file_compression_type: FileCompressionType,
 }
 
 impl FileOpener for CsvOpener {
@@ -203,14 +217,18 @@ impl FileOpener for CsvOpener {
         file_meta: FileMeta,
     ) -> Result<FileOpenFuture> {
         let config = self.config.clone();
+        let file_compression_type = self.file_compression_type.to_owned();
         Ok(Box::pin(async move {
             match store.get(file_meta.location()).await? {
                 GetResult::File(file, _) => {
-                    Ok(futures::stream::iter(config.open(file, true)).boxed())
+                    let decoder = file_compression_type.convert_read(file);
+                    Ok(futures::stream::iter(config.open(decoder, 
true)).boxed())
                 }
                 GetResult::Stream(s) => {
                     let mut first_chunk = true;
-                    Ok(newline_delimited_stream(s.map_err(Into::into))
+                    let s = s.map_err(Into::<DataFusionError>::into);
+                    let decoder = file_compression_type.convert_stream(s);
+                    Ok(newline_delimited_stream(decoder)
                         .map_ok(move |bytes| {
                             let reader = config.open(bytes.reader(), 
first_chunk);
                             first_chunk = false;
@@ -265,28 +283,48 @@ pub async fn plan_to_csv(
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::datasource::file_format::file_type::FileType;
     use crate::physical_plan::file_format::chunked_store::ChunkedStore;
     use crate::prelude::*;
-    use crate::test::partitioned_csv_config;
-    use crate::test_util::aggr_test_schema_with_missing_col;
+    use crate::test::{partitioned_csv_config, partitioned_file_groups};
+    use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data};
     use crate::{scalar::ScalarValue, test_util::aggr_test_schema};
     use arrow::datatypes::*;
     use futures::StreamExt;
     use object_store::local::LocalFileSystem;
+    use rstest::*;
     use std::fs::File;
     use std::io::Write;
     use tempfile::TempDir;
 
+    #[rstest(
+        file_compression_type,
+        case(FileCompressionType::UNCOMPRESSED),
+        case(FileCompressionType::GZIP),
+        case(FileCompressionType::BZIP2)
+    )]
     #[tokio::test]
-    async fn csv_exec_with_projection() -> Result<()> {
+    async fn csv_exec_with_projection(
+        file_compression_type: FileCompressionType,
+    ) -> Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let file_schema = aggr_test_schema();
+        let path = format!("{}/csv", arrow_test_data());
         let filename = "aggregate_test_100.csv";
-        let mut config = partitioned_csv_config(filename, file_schema, 1)?;
+
+        let file_groups = partitioned_file_groups(
+            path.as_str(),
+            filename,
+            1,
+            FileType::CSV,
+            file_compression_type.to_owned(),
+        )?;
+
+        let mut config = partitioned_csv_config(file_schema, file_groups)?;
         config.projection = Some(vec![0, 2, 4]);
 
-        let csv = CsvExec::new(config, true, b',');
+        let csv = CsvExec::new(config, true, b',', 
file_compression_type.to_owned());
         assert_eq!(13, csv.base_config.file_schema.fields().len());
         assert_eq!(3, csv.projected_schema.fields().len());
         assert_eq!(3, csv.schema().fields().len());
@@ -313,16 +351,34 @@ mod tests {
         Ok(())
     }
 
+    #[rstest(
+        file_compression_type,
+        case(FileCompressionType::UNCOMPRESSED),
+        case(FileCompressionType::GZIP),
+        case(FileCompressionType::BZIP2)
+    )]
     #[tokio::test]
-    async fn csv_exec_with_limit() -> Result<()> {
+    async fn csv_exec_with_limit(
+        file_compression_type: FileCompressionType,
+    ) -> Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let file_schema = aggr_test_schema();
+        let path = format!("{}/csv", arrow_test_data());
         let filename = "aggregate_test_100.csv";
-        let mut config = partitioned_csv_config(filename, file_schema, 1)?;
+
+        let file_groups = partitioned_file_groups(
+            path.as_str(),
+            filename,
+            1,
+            FileType::CSV,
+            file_compression_type.to_owned(),
+        )?;
+
+        let mut config = partitioned_csv_config(file_schema, file_groups)?;
         config.limit = Some(5);
 
-        let csv = CsvExec::new(config, true, b',');
+        let csv = CsvExec::new(config, true, b',', 
file_compression_type.to_owned());
         assert_eq!(13, csv.base_config.file_schema.fields().len());
         assert_eq!(13, csv.projected_schema.fields().len());
         assert_eq!(13, csv.schema().fields().len());
@@ -349,16 +405,34 @@ mod tests {
         Ok(())
     }
 
+    #[rstest(
+        file_compression_type,
+        case(FileCompressionType::UNCOMPRESSED),
+        case(FileCompressionType::GZIP),
+        case(FileCompressionType::BZIP2)
+    )]
     #[tokio::test]
-    async fn csv_exec_with_missing_column() -> Result<()> {
+    async fn csv_exec_with_missing_column(
+        file_compression_type: FileCompressionType,
+    ) -> Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let file_schema = aggr_test_schema_with_missing_col();
+        let path = format!("{}/csv", arrow_test_data());
         let filename = "aggregate_test_100.csv";
-        let mut config = partitioned_csv_config(filename, file_schema, 1)?;
+
+        let file_groups = partitioned_file_groups(
+            path.as_str(),
+            filename,
+            1,
+            FileType::CSV,
+            file_compression_type.to_owned(),
+        )?;
+
+        let mut config = partitioned_csv_config(file_schema, file_groups)?;
         config.limit = Some(5);
 
-        let csv = CsvExec::new(config, true, b',');
+        let csv = CsvExec::new(config, true, b',', 
file_compression_type.to_owned());
         assert_eq!(14, csv.base_config.file_schema.fields().len());
         assert_eq!(14, csv.projected_schema.fields().len());
         assert_eq!(14, csv.schema().fields().len());
@@ -385,13 +459,31 @@ mod tests {
         Ok(())
     }
 
+    #[rstest(
+        file_compression_type,
+        case(FileCompressionType::UNCOMPRESSED),
+        case(FileCompressionType::GZIP),
+        case(FileCompressionType::BZIP2)
+    )]
     #[tokio::test]
-    async fn csv_exec_with_partition() -> Result<()> {
+    async fn csv_exec_with_partition(
+        file_compression_type: FileCompressionType,
+    ) -> Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let file_schema = aggr_test_schema();
+        let path = format!("{}/csv", arrow_test_data());
         let filename = "aggregate_test_100.csv";
-        let mut config = partitioned_csv_config(filename, file_schema.clone(), 
1)?;
+
+        let file_groups = partitioned_file_groups(
+            path.as_str(),
+            filename,
+            1,
+            FileType::CSV,
+            file_compression_type.to_owned(),
+        )?;
+
+        let mut config = partitioned_csv_config(file_schema, file_groups)?;
 
         // Add partition columns
         config.table_partition_cols = vec!["date".to_owned()];
@@ -400,11 +492,11 @@ mod tests {
 
         // We should be able to project on the partition column
         // Which is supposed to be after the file fields
-        config.projection = Some(vec![0, file_schema.fields().len()]);
+        config.projection = Some(vec![0, config.file_schema.fields().len()]);
 
         // we don't have `/date=xx/` in the path but that is ok because
         // partitions are resolved during scan anyway
-        let csv = CsvExec::new(config, true, b',');
+        let csv = CsvExec::new(config, true, b',', 
file_compression_type.to_owned());
         assert_eq!(13, csv.base_config.file_schema.fields().len());
         assert_eq!(2, csv.projected_schema.fields().len());
         assert_eq!(2, csv.schema().fields().len());
@@ -459,8 +551,14 @@ mod tests {
         Ok(schema)
     }
 
+    #[rstest(
+        file_compression_type,
+        case(FileCompressionType::UNCOMPRESSED),
+        case(FileCompressionType::GZIP),
+        case(FileCompressionType::BZIP2)
+    )]
     #[tokio::test]
-    async fn test_chunked() {
+    async fn test_chunked(file_compression_type: FileCompressionType) {
         let ctx = SessionContext::new();
         let chunk_sizes = [10, 20, 30, 40];
 
@@ -476,11 +574,21 @@ mod tests {
 
             let task_ctx = ctx.task_ctx();
 
-            let filename = "aggregate_test_100.csv";
             let file_schema = aggr_test_schema();
-            let config =
-                partitioned_csv_config(filename, file_schema.clone(), 
1).unwrap();
-            let csv = CsvExec::new(config, true, b',');
+            let path = format!("{}/csv", arrow_test_data());
+            let filename = "aggregate_test_100.csv";
+
+            let file_groups = partitioned_file_groups(
+                path.as_str(),
+                filename,
+                1,
+                FileType::CSV,
+                file_compression_type.to_owned(),
+            )
+            .unwrap();
+
+            let config = partitioned_csv_config(file_schema, 
file_groups).unwrap();
+            let csv = CsvExec::new(config, true, b',', 
file_compression_type.to_owned());
 
             let it = csv.execute(0, task_ctx).unwrap();
             let batches: Vec<_> = it.try_collect().await.unwrap();
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs 
b/datafusion/core/src/physical_plan/file_format/json.rs
index 10f148ad0..9475be156 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 //! Execution plan for reading line-delimited JSON files
+use crate::datasource::file_format::file_type::FileCompressionType;
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::SessionState;
 use crate::execution::context::TaskContext;
@@ -31,7 +32,9 @@ use crate::physical_plan::{
 };
 use arrow::json::reader::DecoderOptions;
 use arrow::{datatypes::SchemaRef, json};
+
 use bytes::Buf;
+
 use futures::{StreamExt, TryStreamExt};
 use object_store::{GetResult, ObjectStore};
 use std::any::Any;
@@ -50,11 +53,15 @@ pub struct NdJsonExec {
     projected_schema: SchemaRef,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    file_compression_type: FileCompressionType,
 }
 
 impl NdJsonExec {
     /// Create a new JSON reader execution plan provided base configurations
-    pub fn new(base_config: FileScanConfig) -> Self {
+    pub fn new(
+        base_config: FileScanConfig,
+        file_compression_type: FileCompressionType,
+    ) -> Self {
         let (projected_schema, projected_statistics) = base_config.project();
 
         Self {
@@ -62,6 +69,7 @@ impl NdJsonExec {
             projected_schema,
             projected_statistics,
             metrics: ExecutionPlanMetricsSet::new(),
+            file_compression_type,
         }
     }
 }
@@ -118,6 +126,7 @@ impl ExecutionPlan for NdJsonExec {
         let opener = JsonOpener {
             file_schema,
             options,
+            file_compression_type: self.file_compression_type.to_owned(),
         };
 
         let stream = FileStream::new(
@@ -156,6 +165,7 @@ impl ExecutionPlan for NdJsonExec {
 struct JsonOpener {
     options: DecoderOptions,
     file_schema: SchemaRef,
+    file_compression_type: FileCompressionType,
 }
 
 impl FileOpener for JsonOpener {
@@ -166,14 +176,19 @@ impl FileOpener for JsonOpener {
     ) -> Result<FileOpenFuture> {
         let options = self.options.clone();
         let schema = self.file_schema.clone();
+        let file_compression_type = self.file_compression_type.to_owned();
         Ok(Box::pin(async move {
             match store.get(file_meta.location()).await? {
                 GetResult::File(file, _) => {
-                    let reader = json::Reader::new(file, schema.clone(), 
options);
+                    let decoder = file_compression_type.convert_read(file);
+                    let reader = json::Reader::new(decoder, schema.clone(), 
options);
                     Ok(futures::stream::iter(reader).boxed())
                 }
                 GetResult::Stream(s) => {
-                    Ok(newline_delimited_stream(s.map_err(Into::into))
+                    let s = s.map_err(Into::into);
+                    let decoder = file_compression_type.convert_stream(s);
+
+                    Ok(newline_delimited_stream(decoder)
                         .map_ok(move |bytes| {
                             let reader = json::Reader::new(
                                 bytes.reader(),
@@ -236,14 +251,17 @@ mod tests {
     use object_store::local::LocalFileSystem;
 
     use crate::assert_batches_eq;
+    use crate::datasource::file_format::file_type::FileType;
     use crate::datasource::file_format::{json::JsonFormat, FileFormat};
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::physical_plan::file_format::chunked_store::ChunkedStore;
     use crate::prelude::NdJsonReadOptions;
     use crate::prelude::*;
-    use crate::test::object_store::local_unpartitioned_file;
+    use crate::test::partitioned_file_groups;
+    use rstest::*;
     use tempfile::TempDir;
+    use url::Url;
 
     use super::*;
 
@@ -251,38 +269,65 @@ mod tests {
 
     async fn prepare_store(
         ctx: &SessionContext,
+        file_compression_type: FileCompressionType,
     ) -> (ObjectStoreUrl, Vec<Vec<PartitionedFile>>, SchemaRef) {
         let store_url = ObjectStoreUrl::local_filesystem();
         let store = ctx.runtime_env().object_store(&store_url).unwrap();
 
-        let path = format!("{}/1.json", TEST_DATA_BASE);
-        let meta = local_unpartitioned_file(path);
+        let filename = "1.json";
+        let file_groups = partitioned_file_groups(
+            TEST_DATA_BASE,
+            filename,
+            1,
+            FileType::JSON,
+            file_compression_type.to_owned(),
+        )
+        .unwrap();
+        let meta = file_groups
+            .get(0)
+            .unwrap()
+            .get(0)
+            .unwrap()
+            .clone()
+            .object_meta;
         let schema = JsonFormat::default()
+            .with_file_compression_type(file_compression_type.to_owned())
             .infer_schema(&store, &[meta.clone()])
             .await
             .unwrap();
 
-        (store_url, vec![vec![meta.into()]], schema)
+        (store_url, file_groups, schema)
     }
 
+    #[rstest(
+        file_compression_type,
+        case(FileCompressionType::UNCOMPRESSED),
+        case(FileCompressionType::GZIP),
+        case(FileCompressionType::BZIP2)
+    )]
     #[tokio::test]
-    async fn nd_json_exec_file_without_projection() -> Result<()> {
+    async fn nd_json_exec_file_without_projection(
+        file_compression_type: FileCompressionType,
+    ) -> Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         use arrow::datatypes::DataType;
 
         let (object_store_url, file_groups, file_schema) =
-            prepare_store(&session_ctx).await;
-
-        let exec = NdJsonExec::new(FileScanConfig {
-            object_store_url,
-            file_groups,
-            file_schema,
-            statistics: Statistics::default(),
-            projection: None,
-            limit: Some(3),
-            table_partition_cols: vec![],
-        });
+            prepare_store(&session_ctx, 
file_compression_type.to_owned()).await;
+
+        let exec = NdJsonExec::new(
+            FileScanConfig {
+                object_store_url,
+                file_groups,
+                file_schema,
+                statistics: Statistics::default(),
+                projection: None,
+                limit: Some(3),
+                table_partition_cols: vec![],
+            },
+            file_compression_type.to_owned(),
+        );
 
         // TODO: this is not where schema inference should be tested
 
@@ -324,13 +369,21 @@ mod tests {
         Ok(())
     }
 
+    #[rstest(
+        file_compression_type,
+        case(FileCompressionType::UNCOMPRESSED),
+        case(FileCompressionType::GZIP),
+        case(FileCompressionType::BZIP2)
+    )]
     #[tokio::test]
-    async fn nd_json_exec_file_with_missing_column() -> Result<()> {
+    async fn nd_json_exec_file_with_missing_column(
+        file_compression_type: FileCompressionType,
+    ) -> Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         use arrow::datatypes::DataType;
         let (object_store_url, file_groups, actual_schema) =
-            prepare_store(&session_ctx).await;
+            prepare_store(&session_ctx, 
file_compression_type.to_owned()).await;
 
         let mut fields = actual_schema.fields().clone();
         fields.push(Field::new("missing_col", DataType::Int32, true));
@@ -338,15 +391,18 @@ mod tests {
 
         let file_schema = Arc::new(Schema::new(fields));
 
-        let exec = NdJsonExec::new(FileScanConfig {
-            object_store_url,
-            file_groups,
-            file_schema,
-            statistics: Statistics::default(),
-            projection: None,
-            limit: Some(3),
-            table_partition_cols: vec![],
-        });
+        let exec = NdJsonExec::new(
+            FileScanConfig {
+                object_store_url,
+                file_groups,
+                file_schema,
+                statistics: Statistics::default(),
+                projection: None,
+                limit: Some(3),
+                table_partition_cols: vec![],
+            },
+            file_compression_type.to_owned(),
+        );
 
         let mut it = exec.execute(0, task_ctx)?;
         let batch = it.next().await.unwrap()?;
@@ -365,22 +421,33 @@ mod tests {
         Ok(())
     }
 
+    #[rstest(
+        file_compression_type,
+        case(FileCompressionType::UNCOMPRESSED),
+        case(FileCompressionType::GZIP),
+        case(FileCompressionType::BZIP2)
+    )]
     #[tokio::test]
-    async fn nd_json_exec_file_projection() -> Result<()> {
+    async fn nd_json_exec_file_projection(
+        file_compression_type: FileCompressionType,
+    ) -> Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let (object_store_url, file_groups, file_schema) =
-            prepare_store(&session_ctx).await;
-
-        let exec = NdJsonExec::new(FileScanConfig {
-            object_store_url,
-            file_groups,
-            file_schema,
-            statistics: Statistics::default(),
-            projection: Some(vec![0, 2]),
-            limit: None,
-            table_partition_cols: vec![],
-        });
+            prepare_store(&session_ctx, 
file_compression_type.to_owned()).await;
+
+        let exec = NdJsonExec::new(
+            FileScanConfig {
+                object_store_url,
+                file_groups,
+                file_schema,
+                statistics: Statistics::default(),
+                projection: Some(vec![0, 2]),
+                limit: None,
+                table_partition_cols: vec![],
+            },
+            file_compression_type.to_owned(),
+        );
         let inferred_schema = exec.schema();
         assert_eq!(inferred_schema.fields().len(), 2);
 
@@ -452,8 +519,14 @@ mod tests {
         Ok(())
     }
 
+    #[rstest(
+        file_compression_type,
+        case(FileCompressionType::UNCOMPRESSED),
+        case(FileCompressionType::GZIP),
+        case(FileCompressionType::BZIP2)
+    )]
     #[tokio::test]
-    async fn test_chunked() {
+    async fn test_chunked(file_compression_type: FileCompressionType) {
         let mut ctx = SessionContext::new();
 
         for chunk_size in [10, 20, 30, 40] {
@@ -466,8 +539,37 @@ mod tests {
                 )),
             );
 
-            let path = format!("{}/1.json", TEST_DATA_BASE);
-            let frame = ctx.read_json(path, Default::default()).await.unwrap();
+            let filename = "1.json";
+            let file_groups = partitioned_file_groups(
+                TEST_DATA_BASE,
+                filename,
+                1,
+                FileType::JSON,
+                file_compression_type.to_owned(),
+            )
+            .unwrap();
+            let path = file_groups
+                .get(0)
+                .unwrap()
+                .get(0)
+                .unwrap()
+                .object_meta
+                .location
+                .as_ref();
+
+            let store_url = ObjectStoreUrl::local_filesystem();
+            let url: &Url = store_url.as_ref();
+            let path_buf = Path::new(url.path()).join(path);
+            let path = path_buf.to_str().unwrap();
+
+            let ext = FileType::JSON
+                .get_ext_with_compression(file_compression_type.to_owned())
+                .unwrap();
+
+            let read_options = NdJsonReadOptions::default()
+                .file_extension(ext.as_str())
+                .file_compression_type(file_compression_type.to_owned());
+            let frame = ctx.read_json(path, read_options).await.unwrap();
             let results = frame.collect().await.unwrap();
 
             assert_batches_eq!(
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 3b3b09567..dfc6d8edc 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -18,6 +18,8 @@
 //! Common unit test utility methods
 
 use crate::arrow::array::UInt32Array;
+use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
+use crate::datasource::listing::PartitionedFile;
 use crate::datasource::object_store::ObjectStoreUrl;
 use crate::datasource::{MemTable, TableProvider};
 use crate::error::Result;
@@ -25,11 +27,15 @@ use crate::from_slice::FromSlice;
 use crate::logical_plan::LogicalPlan;
 use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
 use crate::test::object_store::local_unpartitioned_file;
-use crate::test_util::aggr_test_schema;
+use crate::test_util::{aggr_test_schema, arrow_test_data};
 use array::ArrayRef;
 use arrow::array::{self, Array, Decimal128Builder, Int32Array};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
+use bzip2::write::BzEncoder;
+use bzip2::Compression as BzCompression;
+use flate2::write::GzEncoder;
+use flate2::Compression as GzCompression;
 use futures::{Future, FutureExt};
 use std::fs::File;
 use std::io::prelude::*;
@@ -58,63 +64,99 @@ pub fn create_table_dual() -> Arc<dyn TableProvider> {
 /// Returns a [`CsvExec`] that scans "aggregate_test_100.csv" with 
`partitions` partitions
 pub fn scan_partitioned_csv(partitions: usize) -> Result<Arc<CsvExec>> {
     let schema = aggr_test_schema();
-    let config = partitioned_csv_config("aggregate_test_100.csv", schema, 
partitions)?;
-    Ok(Arc::new(CsvExec::new(config, true, b',')))
+    let filename = "aggregate_test_100.csv";
+    let path = format!("{}/csv", arrow_test_data());
+    let file_groups = partitioned_file_groups(
+        path.as_str(),
+        filename,
+        partitions,
+        FileType::CSV,
+        FileCompressionType::UNCOMPRESSED,
+    )?;
+    let config = partitioned_csv_config(schema, file_groups)?;
+    Ok(Arc::new(CsvExec::new(
+        config,
+        true,
+        b',',
+        FileCompressionType::UNCOMPRESSED,
+    )))
 }
 
-/// Returns a [`FileScanConfig`] for scanning `partitions` partitions of 
`filename`
-pub fn partitioned_csv_config(
+/// Returns file groups [`Vec<Vec<PartitionedFile>>`] for scanning 
`partitions` of `filename`
+pub fn partitioned_file_groups(
+    path: &str,
     filename: &str,
-    schema: SchemaRef,
     partitions: usize,
-) -> Result<FileScanConfig> {
-    let testdata = crate::test_util::arrow_test_data();
-    let path = format!("{}/csv/{}", testdata, filename);
+    file_type: FileType,
+    file_compression_type: FileCompressionType,
+) -> Result<Vec<Vec<PartitionedFile>>> {
+    let path = format!("{}/{}", path, filename);
 
-    let file_groups = if partitions > 1 {
-        let tmp_dir = TempDir::new()?.into_path();
+    let tmp_dir = TempDir::new()?.into_path();
 
-        let mut writers = vec![];
-        let mut files = vec![];
-        for i in 0..partitions {
-            let filename = format!("partition-{}.csv", i);
-            let filename = tmp_dir.join(&filename);
+    let mut writers = vec![];
+    let mut files = vec![];
+    for i in 0..partitions {
+        let filename = format!(
+            "partition-{}{}",
+            i,
+            file_type
+                .to_owned()
+                .get_ext_with_compression(file_compression_type.to_owned())
+                .unwrap()
+        );
+        let filename = tmp_dir.join(&filename);
 
-            let writer = BufWriter::new(File::create(&filename).unwrap());
-            writers.push(writer);
-            files.push(filename);
-        }
+        let file = File::create(&filename).unwrap();
 
-        let f = File::open(&path)?;
-        let f = BufReader::new(f);
-        for (i, line) in f.lines().enumerate() {
-            let line = line.unwrap();
-
-            if i == 0 {
-                // write header to all partitions
-                for w in writers.iter_mut() {
-                    w.write_all(line.as_bytes()).unwrap();
-                    w.write_all(b"\n").unwrap();
-                }
-            } else {
-                // write data line to single partition
-                let partition = i % partitions;
-                writers[partition].write_all(line.as_bytes()).unwrap();
-                writers[partition].write_all(b"\n").unwrap();
+        let encoder: Box<dyn Write + Send> = match 
file_compression_type.to_owned() {
+            FileCompressionType::UNCOMPRESSED => Box::new(file),
+            FileCompressionType::GZIP => {
+                Box::new(GzEncoder::new(file, GzCompression::default()))
             }
+            FileCompressionType::BZIP2 => {
+                Box::new(BzEncoder::new(file, BzCompression::default()))
+            }
+        };
+
+        let writer = BufWriter::new(encoder);
+        writers.push(writer);
+        files.push(filename);
+    }
+
+    let f = File::open(&path)?;
+    let f = BufReader::new(f);
+    for (i, line) in f.lines().enumerate() {
+        let line = line.unwrap();
+
+        if i == 0 && file_type == FileType::CSV {
+            // write header to all partitions
+            for w in writers.iter_mut() {
+                w.write_all(line.as_bytes()).unwrap();
+                w.write_all(b"\n").unwrap();
+            }
+        } else {
+            // write data line to single partition
+            let partition = i % partitions;
+            writers[partition].write_all(line.as_bytes()).unwrap();
+            writers[partition].write_all(b"\n").unwrap();
         }
-        for w in writers.iter_mut() {
-            w.flush().unwrap();
-        }
+    }
+    for w in writers.iter_mut() {
+        w.flush().unwrap();
+    }
 
-        files
-            .into_iter()
-            .map(|f| vec![local_unpartitioned_file(f).into()])
-            .collect::<Vec<_>>()
-    } else {
-        vec![vec![local_unpartitioned_file(path).into()]]
-    };
+    Ok(files
+        .into_iter()
+        .map(|f| vec![local_unpartitioned_file(f).into()])
+        .collect::<Vec<_>>())
+}
 
+/// Returns a [`FileScanConfig`] for given `file_groups`
+pub fn partitioned_csv_config(
+    schema: SchemaRef,
+    file_groups: Vec<Vec<PartitionedFile>>,
+) -> Result<FileScanConfig> {
     Ok(FileScanConfig {
         object_store_url: ObjectStoreUrl::local_filesystem(),
         file_schema: schema,
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 7f5c95ab1..fd8280217 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1268,6 +1268,8 @@ pub struct CreateExternalTable {
     pub if_not_exists: bool,
     /// SQL used to create the table, if available
     pub definition: Option<String>,
+    /// File compression type (GZIP, BZIP2)
+    pub file_compression_type: String,
 }
 
 /// Produces a relation with string representations of
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 50545b581..9f2cc2d07 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -156,6 +156,7 @@ message CreateExternalTableNode {
   bool if_not_exists = 7;
   string delimiter = 8;
   string definition = 9;
+  string file_compression_type = 10;
 }
 
 message CreateCatalogSchemaNode {
diff --git a/datafusion/proto/src/logical_plan.rs 
b/datafusion/proto/src/logical_plan.rs
index 8f9f8a9c4..a5ddccdb6 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -492,6 +492,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         .table_partition_cols
                         .clone(),
                     if_not_exists: create_extern_table.if_not_exists,
+                    file_compression_type: 
create_extern_table.file_compression_type.to_string(),
                     definition,
                 }))
             }
@@ -1042,6 +1043,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                 table_partition_cols,
                 if_not_exists,
                 definition,
+                file_compression_type,
             }) => Ok(protobuf::LogicalPlanNode {
                 logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
                     protobuf::CreateExternalTableNode {
@@ -1054,6 +1056,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         if_not_exists: *if_not_exists,
                         delimiter: String::from(*delimiter),
                         definition: definition.clone().unwrap_or_else(|| 
"".to_string()),
+                        file_compression_type: 
file_compression_type.to_string(),
                     },
                 )),
             }),
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index b9254159b..cc9078e2a 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -38,6 +38,10 @@ fn parse_file_type(s: &str) -> Result<String, ParserError> {
     Ok(s.to_uppercase())
 }
 
+fn parse_file_compression_type(s: &str) -> Result<String, ParserError> {
+    Ok(s.to_uppercase())
+}
+
 /// DataFusion extension DDL for `CREATE EXTERNAL TABLE`
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct CreateExternalTable {
@@ -57,6 +61,8 @@ pub struct CreateExternalTable {
     pub table_partition_cols: Vec<String>,
     /// Option to not error if table already exists
     pub if_not_exists: bool,
+    /// File compression type (GZIP, BZIP2)
+    pub file_compression_type: String,
 }
 
 impl fmt::Display for CreateExternalTable {
@@ -330,6 +336,12 @@ impl<'a> DFParser<'a> {
             false => ',',
         };
 
+        let file_compression_type = if self.parse_has_file_compression_type() {
+            self.parse_file_compression_type()?
+        } else {
+            "".to_string()
+        };
+
         let table_partition_cols = if self.parse_has_partition() {
             self.parse_partitions()?
         } else {
@@ -348,6 +360,7 @@ impl<'a> DFParser<'a> {
             location,
             table_partition_cols,
             if_not_exists,
+            file_compression_type,
         };
         Ok(Statement::CreateExternalTable(create))
     }
@@ -360,6 +373,14 @@ impl<'a> DFParser<'a> {
         }
     }
 
+    /// Parses the set of
+    fn parse_file_compression_type(&mut self) -> Result<String, ParserError> {
+        match self.parser.next_token() {
+            Token::Word(w) => parse_file_compression_type(&w.value),
+            unexpected => self.expected("one of GZIP, BZIP2", unexpected),
+        }
+    }
+
     fn consume_token(&mut self, expected: &Token) -> bool {
         let token = self.parser.peek_token().to_string().to_uppercase();
         let token = Token::make_keyword(&token);
@@ -370,6 +391,10 @@ impl<'a> DFParser<'a> {
             false
         }
     }
+    fn parse_has_file_compression_type(&mut self) -> bool {
+        self.consume_token(&Token::make_keyword("COMPRESSION"))
+            & self.consume_token(&Token::make_keyword("TYPE"))
+    }
 
     fn parse_csv_has_header(&mut self) -> bool {
         self.consume_token(&Token::make_keyword("WITH"))
@@ -460,6 +485,7 @@ mod tests {
             location: "foo.csv".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
+            file_compression_type: "".to_string(),
         });
         expect_parse_ok(sql, expected)?;
 
@@ -475,6 +501,7 @@ mod tests {
             location: "foo.csv".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
+            file_compression_type: "".to_string(),
         });
         expect_parse_ok(sql, expected)?;
 
@@ -490,6 +517,7 @@ mod tests {
             location: "foo.csv".into(),
             table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
             if_not_exists: false,
+            file_compression_type: "".to_string(),
         });
         expect_parse_ok(sql, expected)?;
 
@@ -508,6 +536,27 @@ mod tests {
                 location: "foo.csv".into(),
                 table_partition_cols: vec![],
                 if_not_exists: false,
+                file_compression_type: "".to_string(),
+            });
+            expect_parse_ok(sql, expected)?;
+        }
+
+        // positive case: it is ok for sql stmt with `COMPRESSION TYPE GZIP` 
tokens
+        let sqls = vec![
+            ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE 
GZIP LOCATION 'foo.csv'", "GZIP"),
+            ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE 
BZIP2 LOCATION 'foo.csv'", "BZIP2"),
+        ];
+        for (sql, file_compression_type) in sqls {
+            let expected = Statement::CreateExternalTable(CreateExternalTable {
+                name: "t".into(),
+                columns: vec![make_column_def("c1", DataType::Int(display))],
+                file_type: "CSV".to_string(),
+                has_header: false,
+                delimiter: ',',
+                location: "foo.csv".into(),
+                table_partition_cols: vec![],
+                if_not_exists: false,
+                file_compression_type: file_compression_type.to_owned(),
             });
             expect_parse_ok(sql, expected)?;
         }
@@ -523,6 +572,7 @@ mod tests {
             location: "foo.parquet".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
+            file_compression_type: "".to_string(),
         });
         expect_parse_ok(sql, expected)?;
 
@@ -537,6 +587,7 @@ mod tests {
             location: "foo.parquet".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
+            file_compression_type: "".to_string(),
         });
         expect_parse_ok(sql, expected)?;
 
@@ -551,6 +602,7 @@ mod tests {
             location: "foo.avro".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
+            file_compression_type: "".to_string(),
         });
         expect_parse_ok(sql, expected)?;
 
@@ -566,6 +618,7 @@ mod tests {
             location: "foo.parquet".into(),
             table_partition_cols: vec![],
             if_not_exists: true,
+            file_compression_type: "".to_string(),
         });
         expect_parse_ok(sql, expected)?;
 
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 58b65af59..b75efd6f4 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -478,6 +478,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             location,
             table_partition_cols,
             if_not_exists,
+            file_compression_type,
         } = statement;
 
         // semantic checks
@@ -487,6 +488,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             ))?;
         }
 
+        if file_type != "CSV" && file_type != "JSON" && 
!file_compression_type.is_empty()
+        {
+            Err(DataFusionError::Plan(
+                "File compression type can be specified for CSV/JSON 
files.".into(),
+            ))?;
+        }
+
         let schema = self.build_schema(columns)?;
 
         Ok(LogicalPlan::CreateExternalTable(PlanCreateExternalTable {
@@ -499,6 +507,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             table_partition_cols,
             if_not_exists,
             definition,
+            file_compression_type,
         }))
     }
 
@@ -3998,6 +4007,36 @@ mod tests {
         quick_test(sql, expected);
     }
 
+    #[test]
+    fn create_external_table_with_compression_type() {
+        // positive case
+        let sqls = vec![
+            "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE 
GZIP LOCATION 'foo.csv.gz'",
+            "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE 
BZIP2 LOCATION 'foo.csv.bz2'",
+            "CREATE EXTERNAL TABLE t(c1 int) STORED AS JSON COMPRESSION TYPE 
GZIP LOCATION 'foo.json.gz'",
+            "CREATE EXTERNAL TABLE t(c1 int) STORED AS JSON COMPRESSION TYPE 
BZIP2 LOCATION 'foo.json.bz2'",
+        ];
+        for sql in sqls {
+            let expected = "CreateExternalTable: \"t\"";
+            quick_test(sql, expected);
+        }
+
+        // negative case
+        let sqls = vec![
+            "CREATE EXTERNAL TABLE t STORED AS AVRO COMPRESSION TYPE GZIP 
LOCATION 'foo.avro'",
+            "CREATE EXTERNAL TABLE t STORED AS AVRO COMPRESSION TYPE BZIP2 
LOCATION 'foo.avro'",
+            "CREATE EXTERNAL TABLE t STORED AS PARQUET COMPRESSION TYPE GZIP 
LOCATION 'foo.parquet'",
+            "CREATE EXTERNAL TABLE t STORED AS PARQUET COMPRESSION TYPE BZIP2 
LOCATION 'foo.parquet'",
+        ];
+        for sql in sqls {
+            let err = logical_plan(sql).expect_err("query should have failed");
+            assert_eq!(
+                "Plan(\"File compression type can be specified for CSV/JSON 
files.\")",
+                format!("{:?}", err)
+            );
+        }
+    }
+
     #[test]
     fn create_external_table_parquet() {
         let sql =

Reply via email to