This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b8a3a78f8 Compressed CSV/JSON support (#3642)
b8a3a78f8 is described below
commit b8a3a78f833fae8faace8d7542a1fb3d7a497b6a
Author: Rito Takeuchi <[email protected]>
AuthorDate: Wed Oct 12 00:27:12 2022 +0900
Compressed CSV/JSON support (#3642)
* Compression text support
* Fix the path joining issue on Windows test
* Debug code for Windows CI
* Utilize `std::path::Path`, instead of `url::Url`
---
.../examples/parquet_sql_multiple_files.rs | 7 +-
datafusion/common/src/error.rs | 6 +
datafusion/core/Cargo.toml | 5 +
datafusion/core/src/datasource/file_format/csv.rs | 27 +-
.../core/src/datasource/file_format/file_type.rs | 331 +++++++++++++++++++++
datafusion/core/src/datasource/file_format/json.rs | 25 +-
datafusion/core/src/datasource/file_format/mod.rs | 1 +
datafusion/core/src/datasource/listing/table.rs | 57 ++--
datafusion/core/src/execution/context.rs | 59 ++--
datafusion/core/src/execution/options.rs | 55 +++-
.../core/src/physical_plan/file_format/csv.rs | 156 ++++++++--
.../core/src/physical_plan/file_format/json.rs | 192 +++++++++---
datafusion/core/src/test/mod.rs | 134 ++++++---
datafusion/expr/src/logical_plan/plan.rs | 2 +
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/logical_plan.rs | 3 +
datafusion/sql/src/parser.rs | 53 ++++
datafusion/sql/src/planner.rs | 39 +++
18 files changed, 976 insertions(+), 177 deletions(-)
diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs
b/datafusion-examples/examples/parquet_sql_multiple_files.rs
index 2b7a5f6fe..bf5bd94a5 100644
--- a/datafusion-examples/examples/parquet_sql_multiple_files.rs
+++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs
@@ -15,9 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion::datasource::file_format::parquet::{
- ParquetFormat, DEFAULT_PARQUET_EXTENSION,
-};
+use datafusion::datasource::file_format::file_type::{FileType, GetExt};
+use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::error::Result;
use datafusion::prelude::*;
@@ -35,7 +34,7 @@ async fn main() -> Result<()> {
// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(true);
let listing_options = ListingOptions {
- file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
+ file_extension: FileType::PARQUET.get_ext(),
format: Arc::new(file_format),
table_partition_cols: vec![],
collect_stat: true,
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index 9157e04a2..c2d3e389c 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -318,6 +318,12 @@ impl Display for DataFusionError {
impl error::Error for DataFusionError {}
+impl From<DataFusionError> for io::Error {
+ fn from(e: DataFusionError) -> Self {
+ io::Error::new(io::ErrorKind::Other, e)
+ }
+}
+
#[cfg(test)]
mod test {
use crate::error::DataFusionError;
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index f7a9ce4e9..44a5cbe1a 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -57,8 +57,10 @@ unicode_expressions =
["datafusion-physical-expr/regex_expressions", "datafusion
ahash = { version = "0.8", default-features = false, features =
["runtime-rng"] }
apache-avro = { version = "0.14", optional = true }
arrow = { version = "24.0.0", features = ["prettyprint"] }
+async-compression = { version = "0.3.14", features = ["bzip2", "gzip",
"futures-io", "tokio"] }
async-trait = "0.1.41"
bytes = "1.1"
+bzip2 = "0.4.3"
chrono = { version = "0.4", default-features = false }
datafusion-common = { path = "../common", version = "13.0.0", features =
["parquet", "object_store"] }
datafusion-expr = { path = "../expr", version = "13.0.0" }
@@ -67,6 +69,7 @@ datafusion-optimizer = { path = "../optimizer", version =
"13.0.0" }
datafusion-physical-expr = { path = "../physical-expr", version = "13.0.0" }
datafusion-row = { path = "../row", version = "13.0.0" }
datafusion-sql = { path = "../sql", version = "13.0.0" }
+flate2 = "1.0.24"
futures = "0.3"
glob = "0.3.0"
hashbrown = { version = "0.12", features = ["raw"] }
@@ -90,6 +93,7 @@ sqlparser = "0.25"
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread",
"sync", "fs", "parking_lot"] }
tokio-stream = "0.1"
+tokio-util = { version = "0.7.4", features = ["io"] }
url = "2.2"
uuid = { version = "1.0", features = ["v4"] }
@@ -102,6 +106,7 @@ ctor = "0.1.22"
doc-comment = "0.3"
env_logger = "0.9"
fuzz-utils = { path = "fuzz-utils" }
+rstest = "0.15.0"
[[bench]]
harness = false
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 2e6994f4b..6a99e35b8 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -18,16 +18,21 @@
//! CSV format abstractions
use std::any::Any;
+
use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
+use bytes::Buf;
+
use datafusion_common::DataFusionError;
+
use futures::TryFutureExt;
use object_store::{ObjectMeta, ObjectStore};
use super::FileFormat;
+use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::error::Result;
use crate::logical_plan::Expr;
@@ -43,6 +48,7 @@ pub struct CsvFormat {
has_header: bool,
delimiter: u8,
schema_infer_max_rec: Option<usize>,
+ file_compression_type: FileCompressionType,
}
impl Default for CsvFormat {
@@ -51,6 +57,7 @@ impl Default for CsvFormat {
schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
has_header: true,
delimiter: b',',
+ file_compression_type: FileCompressionType::UNCOMPRESSED,
}
}
}
@@ -82,6 +89,16 @@ impl CsvFormat {
self
}
+ /// Set a `FileCompressionType` of CSV
+ /// - defaults to `FileCompressionType::UNCOMPRESSED`
+ pub fn with_file_compression_type(
+ mut self,
+ file_compression_type: FileCompressionType,
+ ) -> Self {
+ self.file_compression_type = file_compression_type;
+ self
+ }
+
/// The delimiter character.
pub fn delimiter(&self) -> u8 {
self.delimiter
@@ -110,8 +127,9 @@ impl FileFormat for CsvFormat {
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
+ let decoder =
self.file_compression_type.convert_read(data.reader());
let (schema, records_read) =
arrow::csv::reader::infer_reader_schema(
- &mut data.as_ref(),
+ decoder,
self.delimiter,
Some(records_to_read),
self.has_header,
@@ -144,7 +162,12 @@ impl FileFormat for CsvFormat {
conf: FileScanConfig,
_filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
- let exec = CsvExec::new(conf, self.has_header, self.delimiter);
+ let exec = CsvExec::new(
+ conf,
+ self.has_header,
+ self.delimiter,
+ self.file_compression_type.to_owned(),
+ );
Ok(Arc::new(exec))
}
}
diff --git a/datafusion/core/src/datasource/file_format/file_type.rs
b/datafusion/core/src/datasource/file_format/file_type.rs
new file mode 100644
index 000000000..f08a21ca1
--- /dev/null
+++ b/datafusion/core/src/datasource/file_format/file_type.rs
@@ -0,0 +1,331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! File type abstraction
+
+use crate::error::{DataFusionError, Result};
+use std::io::Error;
+
+use async_compression::tokio::bufread::{
+ BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder,
+};
+use bzip2::read::BzDecoder;
+
+use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
+use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
+use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
+use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
+use bytes::Bytes;
+use flate2::read::GzDecoder;
+use futures::{Stream, TryStreamExt};
+use std::str::FromStr;
+use tokio_util::io::{ReaderStream, StreamReader};
+
+/// Define each `FileType`/`FileCompressionType`'s extension
+pub trait GetExt {
+ /// File extension getter
+ fn get_ext(&self) -> String;
+}
+
+/// Readable file compression type
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum FileCompressionType {
+ /// Gzip-ed file
+ GZIP,
+ /// Bzip2-ed file
+ BZIP2,
+ /// Uncompressed file
+ UNCOMPRESSED,
+}
+
+impl GetExt for FileCompressionType {
+ fn get_ext(&self) -> String {
+ match self {
+ FileCompressionType::GZIP => ".gz".to_owned(),
+ FileCompressionType::BZIP2 => ".bz2".to_owned(),
+ FileCompressionType::UNCOMPRESSED => "".to_owned(),
+ }
+ }
+}
+
+impl FromStr for FileCompressionType {
+ type Err = DataFusionError;
+
+ fn from_str(s: &str) -> Result<Self> {
+ let s = s.to_uppercase();
+ match s.as_str() {
+ "GZIP" | "GZ" => Ok(FileCompressionType::GZIP),
+ "BZIP2" | "BZ2" => Ok(FileCompressionType::BZIP2),
+ "" => Ok(FileCompressionType::UNCOMPRESSED),
+ _ => Err(DataFusionError::NotImplemented(format!(
+ "Unknown FileCompressionType: {}",
+ s
+ ))),
+ }
+ }
+}
+
+/// `FileCompressionType` implementation
+impl FileCompressionType {
+ /// Given a `Stream`, create a `Stream` which data are decompressed with
`FileCompressionType`.
+ pub fn convert_stream<T: Stream<Item = Result<Bytes>> + Unpin + Send +
'static>(
+ &self,
+ s: T,
+ ) -> Box<dyn Stream<Item = Result<Bytes>> + Send + Unpin> {
+ let err_converter = |e: Error| match e
+ .get_ref()
+ .and_then(|e| e.downcast_ref::<DataFusionError>())
+ {
+ Some(_) => {
+ *(e.into_inner()
+ .unwrap()
+ .downcast::<DataFusionError>()
+ .unwrap())
+ }
+ None => Into::<DataFusionError>::into(e),
+ };
+
+ match self {
+ FileCompressionType::GZIP => Box::new(
+ ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
+ .map_err(err_converter),
+ ),
+ FileCompressionType::BZIP2 => Box::new(
+ ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
+ .map_err(err_converter),
+ ),
+ FileCompressionType::UNCOMPRESSED => Box::new(s),
+ }
+ }
+
+ /// Given a `Read`, create a `Read` which data are decompressed with
`FileCompressionType`.
+ pub fn convert_read<T: std::io::Read + Send + 'static>(
+ &self,
+ r: T,
+ ) -> Box<dyn std::io::Read + Send> {
+ match self {
+ FileCompressionType::GZIP => Box::new(GzDecoder::new(r)),
+ FileCompressionType::BZIP2 => Box::new(BzDecoder::new(r)),
+ FileCompressionType::UNCOMPRESSED => Box::new(r),
+ }
+ }
+}
+
+/// Readable file type
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum FileType {
+ /// Apache Avro file
+ AVRO,
+ /// Apache Parquet file
+ PARQUET,
+ /// CSV file
+ CSV,
+ /// JSON file
+ JSON,
+}
+
+impl GetExt for FileType {
+ fn get_ext(&self) -> String {
+ match self {
+ FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(),
+ FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(),
+ FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(),
+ FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(),
+ }
+ }
+}
+
+impl FromStr for FileType {
+ type Err = DataFusionError;
+
+ fn from_str(s: &str) -> Result<Self> {
+ let s = s.to_uppercase();
+ match s.as_str() {
+ "AVRO" => Ok(FileType::AVRO),
+ "PARQUET" => Ok(FileType::PARQUET),
+ "CSV" => Ok(FileType::CSV),
+ "JSON" => Ok(FileType::JSON),
+ _ => Err(DataFusionError::NotImplemented(format!(
+ "Unknown FileType: {}",
+ s
+ ))),
+ }
+ }
+}
+
+impl FileType {
+ /// Given a `FileCompressionType`, return the `FileType`'s extension with
compression suffix
+ pub fn get_ext_with_compression(&self, c: FileCompressionType) ->
Result<String> {
+ let ext = self.get_ext();
+
+ match self {
+ FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext,
c.get_ext())),
+ FileType::PARQUET | FileType::AVRO => match c {
+ FileCompressionType::UNCOMPRESSED => Ok(ext),
+ _ => Err(DataFusionError::Internal(
+ "FileCompressionType can be specified for CSV/JSON
FileType.".into(),
+ )),
+ },
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::datasource::file_format::file_type::{FileCompressionType,
FileType};
+ use crate::error::DataFusionError;
+ use std::str::FromStr;
+
+ #[test]
+ fn get_ext_with_compression() {
+ let file_type = FileType::CSV;
+ assert_eq!(
+ file_type
+ .get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
+ .unwrap(),
+ ".csv"
+ );
+ assert_eq!(
+ file_type
+ .get_ext_with_compression(FileCompressionType::GZIP)
+ .unwrap(),
+ ".csv.gz"
+ );
+ assert_eq!(
+ file_type
+ .get_ext_with_compression(FileCompressionType::BZIP2)
+ .unwrap(),
+ ".csv.bz2"
+ );
+
+ let file_type = FileType::JSON;
+ assert_eq!(
+ file_type
+ .get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
+ .unwrap(),
+ ".json"
+ );
+ assert_eq!(
+ file_type
+ .get_ext_with_compression(FileCompressionType::GZIP)
+ .unwrap(),
+ ".json.gz"
+ );
+ assert_eq!(
+ file_type
+ .get_ext_with_compression(FileCompressionType::BZIP2)
+ .unwrap(),
+ ".json.bz2"
+ );
+
+ let file_type = FileType::AVRO;
+ assert_eq!(
+ file_type
+ .get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
+ .unwrap(),
+ ".avro"
+ );
+ assert!(matches!(
+ file_type.get_ext_with_compression(FileCompressionType::GZIP),
+ Err(DataFusionError::Internal(_))
+ ));
+ assert!(matches!(
+ file_type.get_ext_with_compression(FileCompressionType::BZIP2),
+ Err(DataFusionError::Internal(_))
+ ));
+
+ let file_type = FileType::PARQUET;
+ assert_eq!(
+ file_type
+ .get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
+ .unwrap(),
+ ".parquet"
+ );
+ assert!(matches!(
+ file_type.get_ext_with_compression(FileCompressionType::GZIP),
+ Err(DataFusionError::Internal(_))
+ ));
+ assert!(matches!(
+ file_type.get_ext_with_compression(FileCompressionType::BZIP2),
+ Err(DataFusionError::Internal(_))
+ ));
+ }
+
+ #[test]
+ fn from_str() {
+ assert_eq!(FileType::from_str("csv").unwrap(), FileType::CSV);
+ assert_eq!(FileType::from_str("CSV").unwrap(), FileType::CSV);
+
+ assert_eq!(FileType::from_str("json").unwrap(), FileType::JSON);
+ assert_eq!(FileType::from_str("JSON").unwrap(), FileType::JSON);
+
+ assert_eq!(FileType::from_str("avro").unwrap(), FileType::AVRO);
+ assert_eq!(FileType::from_str("AVRO").unwrap(), FileType::AVRO);
+
+ assert_eq!(FileType::from_str("parquet").unwrap(), FileType::PARQUET);
+ assert_eq!(FileType::from_str("PARQUET").unwrap(), FileType::PARQUET);
+
+ assert!(matches!(
+ FileType::from_str("Unknown"),
+ Err(DataFusionError::NotImplemented(_))
+ ));
+
+ assert_eq!(
+ FileCompressionType::from_str("gz").unwrap(),
+ FileCompressionType::GZIP
+ );
+ assert_eq!(
+ FileCompressionType::from_str("GZ").unwrap(),
+ FileCompressionType::GZIP
+ );
+ assert_eq!(
+ FileCompressionType::from_str("gzip").unwrap(),
+ FileCompressionType::GZIP
+ );
+ assert_eq!(
+ FileCompressionType::from_str("GZIP").unwrap(),
+ FileCompressionType::GZIP
+ );
+
+ assert_eq!(
+ FileCompressionType::from_str("bz2").unwrap(),
+ FileCompressionType::BZIP2
+ );
+ assert_eq!(
+ FileCompressionType::from_str("BZ2").unwrap(),
+ FileCompressionType::BZIP2
+ );
+ assert_eq!(
+ FileCompressionType::from_str("bzip2").unwrap(),
+ FileCompressionType::BZIP2
+ );
+ assert_eq!(
+ FileCompressionType::from_str("BZIP2").unwrap(),
+ FileCompressionType::BZIP2
+ );
+
+ assert_eq!(
+ FileCompressionType::from_str("").unwrap(),
+ FileCompressionType::UNCOMPRESSED
+ );
+
+ assert!(matches!(
+ FileCompressionType::from_str("Unknown"),
+ Err(DataFusionError::NotImplemented(_))
+ ));
+ }
+}
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 9a889ab4c..02a684e85 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -18,6 +18,7 @@
//! Line delimited JSON format abstractions
use std::any::Any;
+
use std::io::BufReader;
use std::sync::Arc;
@@ -26,10 +27,13 @@ use arrow::datatypes::SchemaRef;
use arrow::json::reader::infer_json_schema_from_iterator;
use arrow::json::reader::ValueIter;
use async_trait::async_trait;
+use bytes::Buf;
+
use object_store::{GetResult, ObjectMeta, ObjectStore};
use super::FileFormat;
use super::FileScanConfig;
+use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::error::Result;
use crate::logical_plan::Expr;
@@ -43,12 +47,14 @@ pub const DEFAULT_JSON_EXTENSION: &str = ".json";
#[derive(Debug)]
pub struct JsonFormat {
schema_infer_max_rec: Option<usize>,
+ file_compression_type: FileCompressionType,
}
impl Default for JsonFormat {
fn default() -> Self {
Self {
schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
+ file_compression_type: FileCompressionType::UNCOMPRESSED,
}
}
}
@@ -60,6 +66,16 @@ impl JsonFormat {
self.schema_infer_max_rec = max_rec;
self
}
+
+ /// Set a `FileCompressionType` of JSON
+ /// - defaults to `FileCompressionType::UNCOMPRESSED`
+ pub fn with_file_compression_type(
+ mut self,
+ file_compression_type: FileCompressionType,
+ ) -> Self {
+ self.file_compression_type = file_compression_type;
+ self
+ }
}
#[async_trait]
@@ -75,6 +91,7 @@ impl FileFormat for JsonFormat {
) -> Result<SchemaRef> {
let mut schemas = Vec::new();
let mut records_to_read =
self.schema_infer_max_rec.unwrap_or(usize::MAX);
+ let file_compression_type = self.file_compression_type.to_owned();
for object in objects {
let mut take_while = || {
let should_take = records_to_read > 0;
@@ -86,13 +103,15 @@ impl FileFormat for JsonFormat {
let schema = match store.get(&object.location).await? {
GetResult::File(file, _) => {
- let mut reader = BufReader::new(file);
+ let decoder = file_compression_type.convert_read(file);
+ let mut reader = BufReader::new(decoder);
let iter = ValueIter::new(&mut reader, None);
infer_json_schema_from_iterator(iter.take_while(|_|
take_while()))?
}
r @ GetResult::Stream(_) => {
let data = r.bytes().await?;
- let mut reader = BufReader::new(data.as_ref());
+ let decoder =
file_compression_type.convert_read(data.reader());
+ let mut reader = BufReader::new(decoder);
let iter = ValueIter::new(&mut reader, None);
infer_json_schema_from_iterator(iter.take_while(|_|
take_while()))?
}
@@ -122,7 +141,7 @@ impl FileFormat for JsonFormat {
conf: FileScanConfig,
_filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
- let exec = NdJsonExec::new(conf);
+ let exec = NdJsonExec::new(conf,
self.file_compression_type.to_owned());
Ok(Arc::new(exec))
}
}
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index b16525c7a..7b9421bc7 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -22,6 +22,7 @@ pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
pub mod avro;
pub mod csv;
+pub mod file_type;
pub mod json;
pub mod parquet;
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index f6c617f0c..72f5b9827 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -18,6 +18,7 @@
//! The table implementation.
use ahash::HashMap;
+use std::str::FromStr;
use std::{any::Any, sync::Arc};
use arrow::datatypes::{Field, Schema, SchemaRef};
@@ -27,6 +28,7 @@ use object_store::path::Path;
use object_store::ObjectMeta;
use parking_lot::RwLock;
+use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
use crate::datasource::{
file_format::{
avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
parquet::ParquetFormat,
@@ -102,17 +104,39 @@ impl ListingTableConfig {
}
}
- fn infer_format(suffix: &str) -> Result<Arc<dyn FileFormat>> {
- match suffix {
- "avro" => Ok(Arc::new(AvroFormat::default())),
- "csv" => Ok(Arc::new(CsvFormat::default())),
- "json" => Ok(Arc::new(JsonFormat::default())),
- "parquet" => Ok(Arc::new(ParquetFormat::default())),
- _ => Err(DataFusionError::Internal(format!(
- "Unable to infer file type from suffix {}",
- suffix
- ))),
+ fn infer_format(path: &str) -> Result<(Arc<dyn FileFormat>, String)> {
+ let err_msg = format!("Unable to infer file type from path: {}", path);
+
+ let mut exts = path.rsplit('.');
+
+ let mut splitted = exts.next().unwrap_or("");
+
+ let file_compression_type = FileCompressionType::from_str(splitted)
+ .unwrap_or(FileCompressionType::UNCOMPRESSED);
+
+ if file_compression_type != FileCompressionType::UNCOMPRESSED {
+ splitted = exts.next().unwrap_or("");
}
+
+ let file_type = FileType::from_str(splitted)
+ .map_err(|_| DataFusionError::Internal(err_msg.to_owned()))?;
+
+ let ext = file_type
+ .get_ext_with_compression(file_compression_type.to_owned())
+ .map_err(|_| DataFusionError::Internal(err_msg))?;
+
+ let file_format: Arc<dyn FileFormat> = match file_type {
+ FileType::AVRO => Arc::new(AvroFormat::default()),
+ FileType::CSV => Arc::new(
+
CsvFormat::default().with_file_compression_type(file_compression_type),
+ ),
+ FileType::JSON => Arc::new(
+
JsonFormat::default().with_file_compression_type(file_compression_type),
+ ),
+ FileType::PARQUET => Arc::new(ParquetFormat::default()),
+ };
+
+ Ok((file_format, ext))
}
/// Infer `ListingOptions` based on `table_path` suffix.
@@ -130,16 +154,13 @@ impl ListingTableConfig {
.await
.ok_or_else(|| DataFusionError::Internal("No files for
table".into()))??;
- let file_type =
file.location.as_ref().rsplit('.').next().ok_or_else(|| {
- DataFusionError::Internal("Unable to infer file suffix".into())
- })?;
-
- let format = ListingTableConfig::infer_format(file_type)?;
+ let (format, file_extension) =
+ ListingTableConfig::infer_format(file.location.as_ref())?;
let listing_options = ListingOptions {
format,
collect_stat: true,
- file_extension: file_type.to_string(),
+ file_extension,
target_partitions: ctx.config.target_partitions,
table_partition_cols: vec![],
};
@@ -474,7 +495,7 @@ impl ListingTable {
#[cfg(test)]
mod tests {
- use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
+ use crate::datasource::file_format::file_type::GetExt;
use crate::prelude::SessionContext;
use crate::{
datasource::file_format::{avro::AvroFormat, parquet::ParquetFormat},
@@ -537,7 +558,7 @@ mod tests {
register_test_store(&ctx, &[(&path, 100)]);
let opt = ListingOptions {
- file_extension: DEFAULT_AVRO_EXTENSION.to_owned(),
+ file_extension: FileType::AVRO.get_ext(),
format: Arc::new(AvroFormat {}),
table_partition_cols: vec![String::from("p1")],
target_partitions: 4,
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index d0baa035e..35670f21f 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -24,10 +24,7 @@ use crate::{
datasource::listing::{ListingOptions, ListingTable},
datasource::{
file_format::{
- avro::{AvroFormat, DEFAULT_AVRO_EXTENSION},
- csv::{CsvFormat, DEFAULT_CSV_EXTENSION},
- json::{JsonFormat, DEFAULT_JSON_EXTENSION},
- parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
+ avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
parquet::ParquetFormat,
FileFormat,
},
MemTable, ViewTable,
@@ -42,6 +39,7 @@ use crate::{
pub use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::var_provider::is_system_variables;
use parking_lot::RwLock;
+use std::str::FromStr;
use std::sync::Arc;
use std::{
any::{Any, TypeId},
@@ -81,6 +79,7 @@ use crate::config::{
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
use crate::datasource::datasource::TableProviderFactory;
+use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
use crate::execution::runtime_env::RuntimeEnv;
use crate::logical_expr::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json,
plan_to_parquet};
@@ -100,6 +99,7 @@ use datafusion_sql::{
planner::{ContextProvider, SqlToRel},
};
use parquet::file::properties::WriterProperties;
+
use uuid::Uuid;
use super::options::{
@@ -448,31 +448,38 @@ impl SessionContext {
&self,
cmd: &CreateExternalTable,
) -> Result<Arc<DataFrame>> {
- let (file_format, file_extension) = match cmd.file_type.as_str() {
- "CSV" => (
- Arc::new(
- CsvFormat::default()
- .with_has_header(cmd.has_header)
- .with_delimiter(cmd.delimiter as u8),
- ) as Arc<dyn FileFormat>,
- DEFAULT_CSV_EXTENSION,
- ),
- "PARQUET" => (
- Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
- DEFAULT_PARQUET_EXTENSION,
- ),
- "AVRO" => (
- Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
- DEFAULT_AVRO_EXTENSION,
- ),
- "JSON" => (
- Arc::new(JsonFormat::default()) as Arc<dyn FileFormat>,
- DEFAULT_JSON_EXTENSION,
- ),
- _ => Err(DataFusionError::Execution(
+ let file_compression_type =
+ match
FileCompressionType::from_str(cmd.file_compression_type.as_str()) {
+ Ok(t) => t,
+ Err(_) => Err(DataFusionError::Execution(
+ "Only known FileCompressionTypes can be
ListingTables!".to_string(),
+ ))?,
+ };
+
+ let file_type = match FileType::from_str(cmd.file_type.as_str()) {
+ Ok(t) => t,
+ Err(_) => Err(DataFusionError::Execution(
"Only known FileTypes can be ListingTables!".to_string(),
))?,
};
+
+ let file_extension =
+
file_type.get_ext_with_compression(file_compression_type.to_owned())?;
+
+ let file_format: Arc<dyn FileFormat> = match file_type {
+ FileType::CSV => Arc::new(
+ CsvFormat::default()
+ .with_has_header(cmd.has_header)
+ .with_delimiter(cmd.delimiter as u8)
+ .with_file_compression_type(file_compression_type),
+ ),
+ FileType::PARQUET => Arc::new(ParquetFormat::default()),
+ FileType::AVRO => Arc::new(AvroFormat::default()),
+ FileType::JSON => Arc::new(
+
JsonFormat::default().with_file_compression_type(file_compression_type),
+ ),
+ };
+
let table = self.table(cmd.name.as_str());
match (cmd.if_not_exists, table) {
(true, Ok(_)) => self.return_empty_dataframe(),
diff --git a/datafusion/core/src/execution/options.rs
b/datafusion/core/src/execution/options.rs
index e296d18b6..9ddd3f1d6 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -21,13 +21,15 @@ use std::sync::Arc;
use arrow::datatypes::{Schema, SchemaRef};
+use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
+use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
+use crate::datasource::file_format::file_type::FileCompressionType;
+use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
+use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::{
file_format::{
- avro::{AvroFormat, DEFAULT_AVRO_EXTENSION},
- csv::{CsvFormat, DEFAULT_CSV_EXTENSION},
- json::{JsonFormat, DEFAULT_JSON_EXTENSION},
- parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
+ avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
parquet::ParquetFormat,
},
listing::ListingOptions,
};
@@ -48,10 +50,13 @@ pub struct CsvReadOptions<'a> {
/// Max number of rows to read from CSV files for schema inference if
needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
pub schema_infer_max_records: usize,
/// File extension; only files with this extension are selected for data
input.
- /// Defaults to DEFAULT_CSV_EXTENSION.
+ /// Defaults to `FileType::CSV.get_ext().as_str()`.
pub file_extension: &'a str,
/// Partition Columns
pub table_partition_cols: Vec<String>,
+
+ /// File compression type
+ pub file_compression_type: FileCompressionType,
}
impl<'a> Default for CsvReadOptions<'a> {
@@ -70,6 +75,7 @@ impl<'a> CsvReadOptions<'a> {
delimiter: b',',
file_extension: DEFAULT_CSV_EXTENSION,
table_partition_cols: vec![],
+ file_compression_type: FileCompressionType::UNCOMPRESSED,
}
}
@@ -117,12 +123,22 @@ impl<'a> CsvReadOptions<'a> {
self
}
+ /// Configure file compression type
+ pub fn file_compression_type(
+ mut self,
+ file_compression_type: FileCompressionType,
+ ) -> Self {
+ self.file_compression_type = file_compression_type;
+ self
+ }
+
/// Helper to convert these user facing options to `ListingTable` options
pub fn to_listing_options(&self, target_partitions: usize) ->
ListingOptions {
let file_format = CsvFormat::default()
.with_has_header(self.has_header)
.with_delimiter(self.delimiter)
- .with_schema_infer_max_rec(Some(self.schema_infer_max_records));
+ .with_schema_infer_max_rec(Some(self.schema_infer_max_records))
+ .with_file_compression_type(self.file_compression_type.to_owned());
ListingOptions {
format: Arc::new(file_format),
@@ -154,6 +170,7 @@ pub struct ParquetReadOptions<'a> {
impl<'a> Default for ParquetReadOptions<'a> {
fn default() -> Self {
let format_default = ParquetFormat::default();
+
Self {
file_extension: DEFAULT_PARQUET_EXTENSION,
table_partition_cols: vec![],
@@ -207,7 +224,7 @@ pub struct AvroReadOptions<'a> {
pub schema: Option<SchemaRef>,
/// File extension; only files with this extension are selected for data
input.
- /// Defaults to DEFAULT_AVRO_EXTENSION.
+ /// Defaults to `FileType::AVRO.get_ext().as_str()`.
pub file_extension: &'a str,
/// Partition Columns
pub table_partition_cols: Vec<String>,
@@ -254,10 +271,13 @@ pub struct NdJsonReadOptions<'a> {
pub schema_infer_max_records: usize,
/// File extension; only files with this extension are selected for data
input.
- /// Defaults to DEFAULT_JSON_EXTENSION.
+ /// Defaults to `FileType::JSON.get_ext().as_str()`.
pub file_extension: &'a str,
/// Partition Columns
pub table_partition_cols: Vec<String>,
+
+ /// File compression type
+ pub file_compression_type: FileCompressionType,
}
impl<'a> Default for NdJsonReadOptions<'a> {
@@ -267,6 +287,7 @@ impl<'a> Default for NdJsonReadOptions<'a> {
schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
file_extension: DEFAULT_JSON_EXTENSION,
table_partition_cols: vec![],
+ file_compression_type: FileCompressionType::UNCOMPRESSED,
}
}
}
@@ -278,9 +299,25 @@ impl<'a> NdJsonReadOptions<'a> {
self
}
+ /// Specify file_extension
+ pub fn file_extension(mut self, file_extension: &'a str) -> Self {
+ self.file_extension = file_extension;
+ self
+ }
+
+ /// Specify file_compression_type
+ pub fn file_compression_type(
+ mut self,
+ file_compression_type: FileCompressionType,
+ ) -> Self {
+ self.file_compression_type = file_compression_type;
+ self
+ }
+
/// Helper to convert these user facing options to `ListingTable` options
pub fn to_listing_options(&self, target_partitions: usize) ->
ListingOptions {
- let file_format = JsonFormat::default();
+ let file_format = JsonFormat::default()
+ .with_file_compression_type(self.file_compression_type.to_owned());
ListingOptions {
format: Arc::new(file_format),
collect_stat: false,
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 885bea870..03e1e8059 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -17,6 +17,7 @@
//! Execution plan for reading CSV files
+use crate::datasource::file_format::file_type::FileCompressionType;
use crate::error::{DataFusionError, Result};
use crate::execution::context::{SessionState, TaskContext};
use crate::physical_plan::expressions::PhysicalSortExpr;
@@ -31,7 +32,9 @@ use crate::physical_plan::{
};
use arrow::csv;
use arrow::datatypes::SchemaRef;
+
use bytes::Buf;
+
use futures::{StreamExt, TryStreamExt};
use object_store::{GetResult, ObjectStore};
use std::any::Any;
@@ -52,11 +55,17 @@ pub struct CsvExec {
delimiter: u8,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
+ file_compression_type: FileCompressionType,
}
impl CsvExec {
/// Create a new CSV reader execution plan provided base and specific
configurations
- pub fn new(base_config: FileScanConfig, has_header: bool, delimiter: u8)
-> Self {
+ pub fn new(
+ base_config: FileScanConfig,
+ has_header: bool,
+ delimiter: u8,
+ file_compression_type: FileCompressionType,
+ ) -> Self {
let (projected_schema, projected_statistics) = base_config.project();
Self {
@@ -66,6 +75,7 @@ impl CsvExec {
has_header,
delimiter,
metrics: ExecutionPlanMetricsSet::new(),
+ file_compression_type,
}
}
@@ -132,7 +142,10 @@ impl ExecutionPlan for CsvExec {
delimiter: self.delimiter,
});
- let opener = CsvOpener { config };
+ let opener = CsvOpener {
+ config,
+ file_compression_type: self.file_compression_type.to_owned(),
+ };
let stream = FileStream::new(
&self.base_config,
partition,
@@ -194,6 +207,7 @@ impl CsvConfig {
struct CsvOpener {
config: Arc<CsvConfig>,
+ file_compression_type: FileCompressionType,
}
impl FileOpener for CsvOpener {
@@ -203,14 +217,18 @@ impl FileOpener for CsvOpener {
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
let config = self.config.clone();
+ let file_compression_type = self.file_compression_type.to_owned();
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
- Ok(futures::stream::iter(config.open(file, true)).boxed())
+ let decoder = file_compression_type.convert_read(file);
+ Ok(futures::stream::iter(config.open(decoder,
true)).boxed())
}
GetResult::Stream(s) => {
let mut first_chunk = true;
- Ok(newline_delimited_stream(s.map_err(Into::into))
+ let s = s.map_err(Into::<DataFusionError>::into);
+ let decoder = file_compression_type.convert_stream(s);
+ Ok(newline_delimited_stream(decoder)
.map_ok(move |bytes| {
let reader = config.open(bytes.reader(),
first_chunk);
first_chunk = false;
@@ -265,28 +283,48 @@ pub async fn plan_to_csv(
#[cfg(test)]
mod tests {
use super::*;
+ use crate::datasource::file_format::file_type::FileType;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::prelude::*;
- use crate::test::partitioned_csv_config;
- use crate::test_util::aggr_test_schema_with_missing_col;
+ use crate::test::{partitioned_csv_config, partitioned_file_groups};
+ use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data};
use crate::{scalar::ScalarValue, test_util::aggr_test_schema};
use arrow::datatypes::*;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
+ use rstest::*;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
+ #[rstest(
+ file_compression_type,
+ case(FileCompressionType::UNCOMPRESSED),
+ case(FileCompressionType::GZIP),
+ case(FileCompressionType::BZIP2)
+ )]
#[tokio::test]
- async fn csv_exec_with_projection() -> Result<()> {
+ async fn csv_exec_with_projection(
+ file_compression_type: FileCompressionType,
+ ) -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let file_schema = aggr_test_schema();
+ let path = format!("{}/csv", arrow_test_data());
let filename = "aggregate_test_100.csv";
- let mut config = partitioned_csv_config(filename, file_schema, 1)?;
+
+ let file_groups = partitioned_file_groups(
+ path.as_str(),
+ filename,
+ 1,
+ FileType::CSV,
+ file_compression_type.to_owned(),
+ )?;
+
+ let mut config = partitioned_csv_config(file_schema, file_groups)?;
config.projection = Some(vec![0, 2, 4]);
- let csv = CsvExec::new(config, true, b',');
+ let csv = CsvExec::new(config, true, b',',
file_compression_type.to_owned());
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(3, csv.projected_schema.fields().len());
assert_eq!(3, csv.schema().fields().len());
@@ -313,16 +351,34 @@ mod tests {
Ok(())
}
+ #[rstest(
+ file_compression_type,
+ case(FileCompressionType::UNCOMPRESSED),
+ case(FileCompressionType::GZIP),
+ case(FileCompressionType::BZIP2)
+ )]
#[tokio::test]
- async fn csv_exec_with_limit() -> Result<()> {
+ async fn csv_exec_with_limit(
+ file_compression_type: FileCompressionType,
+ ) -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let file_schema = aggr_test_schema();
+ let path = format!("{}/csv", arrow_test_data());
let filename = "aggregate_test_100.csv";
- let mut config = partitioned_csv_config(filename, file_schema, 1)?;
+
+ let file_groups = partitioned_file_groups(
+ path.as_str(),
+ filename,
+ 1,
+ FileType::CSV,
+ file_compression_type.to_owned(),
+ )?;
+
+ let mut config = partitioned_csv_config(file_schema, file_groups)?;
config.limit = Some(5);
- let csv = CsvExec::new(config, true, b',');
+ let csv = CsvExec::new(config, true, b',',
file_compression_type.to_owned());
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(13, csv.projected_schema.fields().len());
assert_eq!(13, csv.schema().fields().len());
@@ -349,16 +405,34 @@ mod tests {
Ok(())
}
+ #[rstest(
+ file_compression_type,
+ case(FileCompressionType::UNCOMPRESSED),
+ case(FileCompressionType::GZIP),
+ case(FileCompressionType::BZIP2)
+ )]
#[tokio::test]
- async fn csv_exec_with_missing_column() -> Result<()> {
+ async fn csv_exec_with_missing_column(
+ file_compression_type: FileCompressionType,
+ ) -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let file_schema = aggr_test_schema_with_missing_col();
+ let path = format!("{}/csv", arrow_test_data());
let filename = "aggregate_test_100.csv";
- let mut config = partitioned_csv_config(filename, file_schema, 1)?;
+
+ let file_groups = partitioned_file_groups(
+ path.as_str(),
+ filename,
+ 1,
+ FileType::CSV,
+ file_compression_type.to_owned(),
+ )?;
+
+ let mut config = partitioned_csv_config(file_schema, file_groups)?;
config.limit = Some(5);
- let csv = CsvExec::new(config, true, b',');
+ let csv = CsvExec::new(config, true, b',',
file_compression_type.to_owned());
assert_eq!(14, csv.base_config.file_schema.fields().len());
assert_eq!(14, csv.projected_schema.fields().len());
assert_eq!(14, csv.schema().fields().len());
@@ -385,13 +459,31 @@ mod tests {
Ok(())
}
+ #[rstest(
+ file_compression_type,
+ case(FileCompressionType::UNCOMPRESSED),
+ case(FileCompressionType::GZIP),
+ case(FileCompressionType::BZIP2)
+ )]
#[tokio::test]
- async fn csv_exec_with_partition() -> Result<()> {
+ async fn csv_exec_with_partition(
+ file_compression_type: FileCompressionType,
+ ) -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let file_schema = aggr_test_schema();
+ let path = format!("{}/csv", arrow_test_data());
let filename = "aggregate_test_100.csv";
- let mut config = partitioned_csv_config(filename, file_schema.clone(),
1)?;
+
+ let file_groups = partitioned_file_groups(
+ path.as_str(),
+ filename,
+ 1,
+ FileType::CSV,
+ file_compression_type.to_owned(),
+ )?;
+
+ let mut config = partitioned_csv_config(file_schema, file_groups)?;
// Add partition columns
config.table_partition_cols = vec!["date".to_owned()];
@@ -400,11 +492,11 @@ mod tests {
// We should be able to project on the partition column
// Which is supposed to be after the file fields
- config.projection = Some(vec![0, file_schema.fields().len()]);
+ config.projection = Some(vec![0, config.file_schema.fields().len()]);
// we don't have `/date=xx/` in the path but that is ok because
// partitions are resolved during scan anyway
- let csv = CsvExec::new(config, true, b',');
+ let csv = CsvExec::new(config, true, b',',
file_compression_type.to_owned());
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(2, csv.projected_schema.fields().len());
assert_eq!(2, csv.schema().fields().len());
@@ -459,8 +551,14 @@ mod tests {
Ok(schema)
}
+ #[rstest(
+ file_compression_type,
+ case(FileCompressionType::UNCOMPRESSED),
+ case(FileCompressionType::GZIP),
+ case(FileCompressionType::BZIP2)
+ )]
#[tokio::test]
- async fn test_chunked() {
+ async fn test_chunked(file_compression_type: FileCompressionType) {
let ctx = SessionContext::new();
let chunk_sizes = [10, 20, 30, 40];
@@ -476,11 +574,21 @@ mod tests {
let task_ctx = ctx.task_ctx();
- let filename = "aggregate_test_100.csv";
let file_schema = aggr_test_schema();
- let config =
- partitioned_csv_config(filename, file_schema.clone(),
1).unwrap();
- let csv = CsvExec::new(config, true, b',');
+ let path = format!("{}/csv", arrow_test_data());
+ let filename = "aggregate_test_100.csv";
+
+ let file_groups = partitioned_file_groups(
+ path.as_str(),
+ filename,
+ 1,
+ FileType::CSV,
+ file_compression_type.to_owned(),
+ )
+ .unwrap();
+
+ let config = partitioned_csv_config(file_schema,
file_groups).unwrap();
+ let csv = CsvExec::new(config, true, b',',
file_compression_type.to_owned());
let it = csv.execute(0, task_ctx).unwrap();
let batches: Vec<_> = it.try_collect().await.unwrap();
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs
b/datafusion/core/src/physical_plan/file_format/json.rs
index 10f148ad0..9475be156 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -16,6 +16,7 @@
// under the License.
//! Execution plan for reading line-delimited JSON files
+use crate::datasource::file_format::file_type::FileCompressionType;
use crate::error::{DataFusionError, Result};
use crate::execution::context::SessionState;
use crate::execution::context::TaskContext;
@@ -31,7 +32,9 @@ use crate::physical_plan::{
};
use arrow::json::reader::DecoderOptions;
use arrow::{datatypes::SchemaRef, json};
+
use bytes::Buf;
+
use futures::{StreamExt, TryStreamExt};
use object_store::{GetResult, ObjectStore};
use std::any::Any;
@@ -50,11 +53,15 @@ pub struct NdJsonExec {
projected_schema: SchemaRef,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
+ file_compression_type: FileCompressionType,
}
impl NdJsonExec {
/// Create a new JSON reader execution plan provided base configurations
- pub fn new(base_config: FileScanConfig) -> Self {
+ pub fn new(
+ base_config: FileScanConfig,
+ file_compression_type: FileCompressionType,
+ ) -> Self {
let (projected_schema, projected_statistics) = base_config.project();
Self {
@@ -62,6 +69,7 @@ impl NdJsonExec {
projected_schema,
projected_statistics,
metrics: ExecutionPlanMetricsSet::new(),
+ file_compression_type,
}
}
}
@@ -118,6 +126,7 @@ impl ExecutionPlan for NdJsonExec {
let opener = JsonOpener {
file_schema,
options,
+ file_compression_type: self.file_compression_type.to_owned(),
};
let stream = FileStream::new(
@@ -156,6 +165,7 @@ impl ExecutionPlan for NdJsonExec {
struct JsonOpener {
options: DecoderOptions,
file_schema: SchemaRef,
+ file_compression_type: FileCompressionType,
}
impl FileOpener for JsonOpener {
@@ -166,14 +176,19 @@ impl FileOpener for JsonOpener {
) -> Result<FileOpenFuture> {
let options = self.options.clone();
let schema = self.file_schema.clone();
+ let file_compression_type = self.file_compression_type.to_owned();
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
- let reader = json::Reader::new(file, schema.clone(),
options);
+ let decoder = file_compression_type.convert_read(file);
+ let reader = json::Reader::new(decoder, schema.clone(),
options);
Ok(futures::stream::iter(reader).boxed())
}
GetResult::Stream(s) => {
- Ok(newline_delimited_stream(s.map_err(Into::into))
+ let s = s.map_err(Into::into);
+ let decoder = file_compression_type.convert_stream(s);
+
+ Ok(newline_delimited_stream(decoder)
.map_ok(move |bytes| {
let reader = json::Reader::new(
bytes.reader(),
@@ -236,14 +251,17 @@ mod tests {
use object_store::local::LocalFileSystem;
use crate::assert_batches_eq;
+ use crate::datasource::file_format::file_type::FileType;
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::prelude::NdJsonReadOptions;
use crate::prelude::*;
- use crate::test::object_store::local_unpartitioned_file;
+ use crate::test::partitioned_file_groups;
+ use rstest::*;
use tempfile::TempDir;
+ use url::Url;
use super::*;
@@ -251,38 +269,65 @@ mod tests {
async fn prepare_store(
ctx: &SessionContext,
+ file_compression_type: FileCompressionType,
) -> (ObjectStoreUrl, Vec<Vec<PartitionedFile>>, SchemaRef) {
let store_url = ObjectStoreUrl::local_filesystem();
let store = ctx.runtime_env().object_store(&store_url).unwrap();
- let path = format!("{}/1.json", TEST_DATA_BASE);
- let meta = local_unpartitioned_file(path);
+ let filename = "1.json";
+ let file_groups = partitioned_file_groups(
+ TEST_DATA_BASE,
+ filename,
+ 1,
+ FileType::JSON,
+ file_compression_type.to_owned(),
+ )
+ .unwrap();
+ let meta = file_groups
+ .get(0)
+ .unwrap()
+ .get(0)
+ .unwrap()
+ .clone()
+ .object_meta;
let schema = JsonFormat::default()
+ .with_file_compression_type(file_compression_type.to_owned())
.infer_schema(&store, &[meta.clone()])
.await
.unwrap();
- (store_url, vec![vec![meta.into()]], schema)
+ (store_url, file_groups, schema)
}
+ #[rstest(
+ file_compression_type,
+ case(FileCompressionType::UNCOMPRESSED),
+ case(FileCompressionType::GZIP),
+ case(FileCompressionType::BZIP2)
+ )]
#[tokio::test]
- async fn nd_json_exec_file_without_projection() -> Result<()> {
+ async fn nd_json_exec_file_without_projection(
+ file_compression_type: FileCompressionType,
+ ) -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
use arrow::datatypes::DataType;
let (object_store_url, file_groups, file_schema) =
- prepare_store(&session_ctx).await;
-
- let exec = NdJsonExec::new(FileScanConfig {
- object_store_url,
- file_groups,
- file_schema,
- statistics: Statistics::default(),
- projection: None,
- limit: Some(3),
- table_partition_cols: vec![],
- });
+ prepare_store(&session_ctx,
file_compression_type.to_owned()).await;
+
+ let exec = NdJsonExec::new(
+ FileScanConfig {
+ object_store_url,
+ file_groups,
+ file_schema,
+ statistics: Statistics::default(),
+ projection: None,
+ limit: Some(3),
+ table_partition_cols: vec![],
+ },
+ file_compression_type.to_owned(),
+ );
// TODO: this is not where schema inference should be tested
@@ -324,13 +369,21 @@ mod tests {
Ok(())
}
+ #[rstest(
+ file_compression_type,
+ case(FileCompressionType::UNCOMPRESSED),
+ case(FileCompressionType::GZIP),
+ case(FileCompressionType::BZIP2)
+ )]
#[tokio::test]
- async fn nd_json_exec_file_with_missing_column() -> Result<()> {
+ async fn nd_json_exec_file_with_missing_column(
+ file_compression_type: FileCompressionType,
+ ) -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
use arrow::datatypes::DataType;
let (object_store_url, file_groups, actual_schema) =
- prepare_store(&session_ctx).await;
+ prepare_store(&session_ctx,
file_compression_type.to_owned()).await;
let mut fields = actual_schema.fields().clone();
fields.push(Field::new("missing_col", DataType::Int32, true));
@@ -338,15 +391,18 @@ mod tests {
let file_schema = Arc::new(Schema::new(fields));
- let exec = NdJsonExec::new(FileScanConfig {
- object_store_url,
- file_groups,
- file_schema,
- statistics: Statistics::default(),
- projection: None,
- limit: Some(3),
- table_partition_cols: vec![],
- });
+ let exec = NdJsonExec::new(
+ FileScanConfig {
+ object_store_url,
+ file_groups,
+ file_schema,
+ statistics: Statistics::default(),
+ projection: None,
+ limit: Some(3),
+ table_partition_cols: vec![],
+ },
+ file_compression_type.to_owned(),
+ );
let mut it = exec.execute(0, task_ctx)?;
let batch = it.next().await.unwrap()?;
@@ -365,22 +421,33 @@ mod tests {
Ok(())
}
+ #[rstest(
+ file_compression_type,
+ case(FileCompressionType::UNCOMPRESSED),
+ case(FileCompressionType::GZIP),
+ case(FileCompressionType::BZIP2)
+ )]
#[tokio::test]
- async fn nd_json_exec_file_projection() -> Result<()> {
+ async fn nd_json_exec_file_projection(
+ file_compression_type: FileCompressionType,
+ ) -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let (object_store_url, file_groups, file_schema) =
- prepare_store(&session_ctx).await;
-
- let exec = NdJsonExec::new(FileScanConfig {
- object_store_url,
- file_groups,
- file_schema,
- statistics: Statistics::default(),
- projection: Some(vec![0, 2]),
- limit: None,
- table_partition_cols: vec![],
- });
+ prepare_store(&session_ctx,
file_compression_type.to_owned()).await;
+
+ let exec = NdJsonExec::new(
+ FileScanConfig {
+ object_store_url,
+ file_groups,
+ file_schema,
+ statistics: Statistics::default(),
+ projection: Some(vec![0, 2]),
+ limit: None,
+ table_partition_cols: vec![],
+ },
+ file_compression_type.to_owned(),
+ );
let inferred_schema = exec.schema();
assert_eq!(inferred_schema.fields().len(), 2);
@@ -452,8 +519,14 @@ mod tests {
Ok(())
}
+ #[rstest(
+ file_compression_type,
+ case(FileCompressionType::UNCOMPRESSED),
+ case(FileCompressionType::GZIP),
+ case(FileCompressionType::BZIP2)
+ )]
#[tokio::test]
- async fn test_chunked() {
+ async fn test_chunked(file_compression_type: FileCompressionType) {
let mut ctx = SessionContext::new();
for chunk_size in [10, 20, 30, 40] {
@@ -466,8 +539,37 @@ mod tests {
)),
);
- let path = format!("{}/1.json", TEST_DATA_BASE);
- let frame = ctx.read_json(path, Default::default()).await.unwrap();
+ let filename = "1.json";
+ let file_groups = partitioned_file_groups(
+ TEST_DATA_BASE,
+ filename,
+ 1,
+ FileType::JSON,
+ file_compression_type.to_owned(),
+ )
+ .unwrap();
+ let path = file_groups
+ .get(0)
+ .unwrap()
+ .get(0)
+ .unwrap()
+ .object_meta
+ .location
+ .as_ref();
+
+ let store_url = ObjectStoreUrl::local_filesystem();
+ let url: &Url = store_url.as_ref();
+ let path_buf = Path::new(url.path()).join(path);
+ let path = path_buf.to_str().unwrap();
+
+ let ext = FileType::JSON
+ .get_ext_with_compression(file_compression_type.to_owned())
+ .unwrap();
+
+ let read_options = NdJsonReadOptions::default()
+ .file_extension(ext.as_str())
+ .file_compression_type(file_compression_type.to_owned());
+ let frame = ctx.read_json(path, read_options).await.unwrap();
let results = frame.collect().await.unwrap();
assert_batches_eq!(
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 3b3b09567..dfc6d8edc 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -18,6 +18,8 @@
//! Common unit test utility methods
use crate::arrow::array::UInt32Array;
+use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
+use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::{MemTable, TableProvider};
use crate::error::Result;
@@ -25,11 +27,15 @@ use crate::from_slice::FromSlice;
use crate::logical_plan::LogicalPlan;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::test::object_store::local_unpartitioned_file;
-use crate::test_util::aggr_test_schema;
+use crate::test_util::{aggr_test_schema, arrow_test_data};
use array::ArrayRef;
use arrow::array::{self, Array, Decimal128Builder, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
+use bzip2::write::BzEncoder;
+use bzip2::Compression as BzCompression;
+use flate2::write::GzEncoder;
+use flate2::Compression as GzCompression;
use futures::{Future, FutureExt};
use std::fs::File;
use std::io::prelude::*;
@@ -58,63 +64,99 @@ pub fn create_table_dual() -> Arc<dyn TableProvider> {
/// Returns a [`CsvExec`] that scans "aggregate_test_100.csv" with
`partitions` partitions
pub fn scan_partitioned_csv(partitions: usize) -> Result<Arc<CsvExec>> {
let schema = aggr_test_schema();
- let config = partitioned_csv_config("aggregate_test_100.csv", schema,
partitions)?;
- Ok(Arc::new(CsvExec::new(config, true, b',')))
+ let filename = "aggregate_test_100.csv";
+ let path = format!("{}/csv", arrow_test_data());
+ let file_groups = partitioned_file_groups(
+ path.as_str(),
+ filename,
+ partitions,
+ FileType::CSV,
+ FileCompressionType::UNCOMPRESSED,
+ )?;
+ let config = partitioned_csv_config(schema, file_groups)?;
+ Ok(Arc::new(CsvExec::new(
+ config,
+ true,
+ b',',
+ FileCompressionType::UNCOMPRESSED,
+ )))
}
-/// Returns a [`FileScanConfig`] for scanning `partitions` partitions of
`filename`
-pub fn partitioned_csv_config(
+/// Returns file groups [`Vec<Vec<PartitionedFile>>`] for scanning
`partitions` of `filename`
+pub fn partitioned_file_groups(
+ path: &str,
filename: &str,
- schema: SchemaRef,
partitions: usize,
-) -> Result<FileScanConfig> {
- let testdata = crate::test_util::arrow_test_data();
- let path = format!("{}/csv/{}", testdata, filename);
+ file_type: FileType,
+ file_compression_type: FileCompressionType,
+) -> Result<Vec<Vec<PartitionedFile>>> {
+ let path = format!("{}/{}", path, filename);
- let file_groups = if partitions > 1 {
- let tmp_dir = TempDir::new()?.into_path();
+ let tmp_dir = TempDir::new()?.into_path();
- let mut writers = vec![];
- let mut files = vec![];
- for i in 0..partitions {
- let filename = format!("partition-{}.csv", i);
- let filename = tmp_dir.join(&filename);
+ let mut writers = vec![];
+ let mut files = vec![];
+ for i in 0..partitions {
+ let filename = format!(
+ "partition-{}{}",
+ i,
+ file_type
+ .to_owned()
+ .get_ext_with_compression(file_compression_type.to_owned())
+ .unwrap()
+ );
+ let filename = tmp_dir.join(&filename);
- let writer = BufWriter::new(File::create(&filename).unwrap());
- writers.push(writer);
- files.push(filename);
- }
+ let file = File::create(&filename).unwrap();
- let f = File::open(&path)?;
- let f = BufReader::new(f);
- for (i, line) in f.lines().enumerate() {
- let line = line.unwrap();
-
- if i == 0 {
- // write header to all partitions
- for w in writers.iter_mut() {
- w.write_all(line.as_bytes()).unwrap();
- w.write_all(b"\n").unwrap();
- }
- } else {
- // write data line to single partition
- let partition = i % partitions;
- writers[partition].write_all(line.as_bytes()).unwrap();
- writers[partition].write_all(b"\n").unwrap();
+ let encoder: Box<dyn Write + Send> = match
file_compression_type.to_owned() {
+ FileCompressionType::UNCOMPRESSED => Box::new(file),
+ FileCompressionType::GZIP => {
+ Box::new(GzEncoder::new(file, GzCompression::default()))
}
+ FileCompressionType::BZIP2 => {
+ Box::new(BzEncoder::new(file, BzCompression::default()))
+ }
+ };
+
+ let writer = BufWriter::new(encoder);
+ writers.push(writer);
+ files.push(filename);
+ }
+
+ let f = File::open(&path)?;
+ let f = BufReader::new(f);
+ for (i, line) in f.lines().enumerate() {
+ let line = line.unwrap();
+
+ if i == 0 && file_type == FileType::CSV {
+ // write header to all partitions
+ for w in writers.iter_mut() {
+ w.write_all(line.as_bytes()).unwrap();
+ w.write_all(b"\n").unwrap();
+ }
+ } else {
+ // write data line to single partition
+ let partition = i % partitions;
+ writers[partition].write_all(line.as_bytes()).unwrap();
+ writers[partition].write_all(b"\n").unwrap();
}
- for w in writers.iter_mut() {
- w.flush().unwrap();
- }
+ }
+ for w in writers.iter_mut() {
+ w.flush().unwrap();
+ }
- files
- .into_iter()
- .map(|f| vec![local_unpartitioned_file(f).into()])
- .collect::<Vec<_>>()
- } else {
- vec![vec![local_unpartitioned_file(path).into()]]
- };
+ Ok(files
+ .into_iter()
+ .map(|f| vec![local_unpartitioned_file(f).into()])
+ .collect::<Vec<_>>())
+}
+/// Returns a [`FileScanConfig`] for given `file_groups`
+pub fn partitioned_csv_config(
+ schema: SchemaRef,
+ file_groups: Vec<Vec<PartitionedFile>>,
+) -> Result<FileScanConfig> {
Ok(FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: schema,
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 7f5c95ab1..fd8280217 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1268,6 +1268,8 @@ pub struct CreateExternalTable {
pub if_not_exists: bool,
/// SQL used to create the table, if available
pub definition: Option<String>,
+ /// File compression type (GZIP, BZIP2)
+ pub file_compression_type: String,
}
/// Produces a relation with string representations of
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 50545b581..9f2cc2d07 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -156,6 +156,7 @@ message CreateExternalTableNode {
bool if_not_exists = 7;
string delimiter = 8;
string definition = 9;
+ string file_compression_type = 10;
}
message CreateCatalogSchemaNode {
diff --git a/datafusion/proto/src/logical_plan.rs
b/datafusion/proto/src/logical_plan.rs
index 8f9f8a9c4..a5ddccdb6 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -492,6 +492,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.table_partition_cols
.clone(),
if_not_exists: create_extern_table.if_not_exists,
+ file_compression_type:
create_extern_table.file_compression_type.to_string(),
definition,
}))
}
@@ -1042,6 +1043,7 @@ impl AsLogicalPlan for LogicalPlanNode {
table_partition_cols,
if_not_exists,
definition,
+ file_compression_type,
}) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
protobuf::CreateExternalTableNode {
@@ -1054,6 +1056,7 @@ impl AsLogicalPlan for LogicalPlanNode {
if_not_exists: *if_not_exists,
delimiter: String::from(*delimiter),
definition: definition.clone().unwrap_or_else(||
"".to_string()),
+ file_compression_type:
file_compression_type.to_string(),
},
)),
}),
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index b9254159b..cc9078e2a 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -38,6 +38,10 @@ fn parse_file_type(s: &str) -> Result<String, ParserError> {
Ok(s.to_uppercase())
}
+fn parse_file_compression_type(s: &str) -> Result<String, ParserError> {
+ Ok(s.to_uppercase())
+}
+
/// DataFusion extension DDL for `CREATE EXTERNAL TABLE`
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CreateExternalTable {
@@ -57,6 +61,8 @@ pub struct CreateExternalTable {
pub table_partition_cols: Vec<String>,
/// Option to not error if table already exists
pub if_not_exists: bool,
+ /// File compression type (GZIP, BZIP2)
+ pub file_compression_type: String,
}
impl fmt::Display for CreateExternalTable {
@@ -330,6 +336,12 @@ impl<'a> DFParser<'a> {
false => ',',
};
+ let file_compression_type = if self.parse_has_file_compression_type() {
+ self.parse_file_compression_type()?
+ } else {
+ "".to_string()
+ };
+
let table_partition_cols = if self.parse_has_partition() {
self.parse_partitions()?
} else {
@@ -348,6 +360,7 @@ impl<'a> DFParser<'a> {
location,
table_partition_cols,
if_not_exists,
+ file_compression_type,
};
Ok(Statement::CreateExternalTable(create))
}
@@ -360,6 +373,14 @@ impl<'a> DFParser<'a> {
}
}
+ /// Parses the set of
+ fn parse_file_compression_type(&mut self) -> Result<String, ParserError> {
+ match self.parser.next_token() {
+ Token::Word(w) => parse_file_compression_type(&w.value),
+ unexpected => self.expected("one of GZIP, BZIP2", unexpected),
+ }
+ }
+
fn consume_token(&mut self, expected: &Token) -> bool {
let token = self.parser.peek_token().to_string().to_uppercase();
let token = Token::make_keyword(&token);
@@ -370,6 +391,10 @@ impl<'a> DFParser<'a> {
false
}
}
+ fn parse_has_file_compression_type(&mut self) -> bool {
+ self.consume_token(&Token::make_keyword("COMPRESSION"))
+ & self.consume_token(&Token::make_keyword("TYPE"))
+ }
fn parse_csv_has_header(&mut self) -> bool {
self.consume_token(&Token::make_keyword("WITH"))
@@ -460,6 +485,7 @@ mod tests {
location: "foo.csv".into(),
table_partition_cols: vec![],
if_not_exists: false,
+ file_compression_type: "".to_string(),
});
expect_parse_ok(sql, expected)?;
@@ -475,6 +501,7 @@ mod tests {
location: "foo.csv".into(),
table_partition_cols: vec![],
if_not_exists: false,
+ file_compression_type: "".to_string(),
});
expect_parse_ok(sql, expected)?;
@@ -490,6 +517,7 @@ mod tests {
location: "foo.csv".into(),
table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
if_not_exists: false,
+ file_compression_type: "".to_string(),
});
expect_parse_ok(sql, expected)?;
@@ -508,6 +536,27 @@ mod tests {
location: "foo.csv".into(),
table_partition_cols: vec![],
if_not_exists: false,
+ file_compression_type: "".to_string(),
+ });
+ expect_parse_ok(sql, expected)?;
+ }
+
+ // positive case: it is ok for sql stmt with `COMPRESSION TYPE GZIP`
tokens
+ let sqls = vec![
+ ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE
GZIP LOCATION 'foo.csv'", "GZIP"),
+ ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE
BZIP2 LOCATION 'foo.csv'", "BZIP2"),
+ ];
+ for (sql, file_compression_type) in sqls {
+ let expected = Statement::CreateExternalTable(CreateExternalTable {
+ name: "t".into(),
+ columns: vec![make_column_def("c1", DataType::Int(display))],
+ file_type: "CSV".to_string(),
+ has_header: false,
+ delimiter: ',',
+ location: "foo.csv".into(),
+ table_partition_cols: vec![],
+ if_not_exists: false,
+ file_compression_type: file_compression_type.to_owned(),
});
expect_parse_ok(sql, expected)?;
}
@@ -523,6 +572,7 @@ mod tests {
location: "foo.parquet".into(),
table_partition_cols: vec![],
if_not_exists: false,
+ file_compression_type: "".to_string(),
});
expect_parse_ok(sql, expected)?;
@@ -537,6 +587,7 @@ mod tests {
location: "foo.parquet".into(),
table_partition_cols: vec![],
if_not_exists: false,
+ file_compression_type: "".to_string(),
});
expect_parse_ok(sql, expected)?;
@@ -551,6 +602,7 @@ mod tests {
location: "foo.avro".into(),
table_partition_cols: vec![],
if_not_exists: false,
+ file_compression_type: "".to_string(),
});
expect_parse_ok(sql, expected)?;
@@ -566,6 +618,7 @@ mod tests {
location: "foo.parquet".into(),
table_partition_cols: vec![],
if_not_exists: true,
+ file_compression_type: "".to_string(),
});
expect_parse_ok(sql, expected)?;
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 58b65af59..b75efd6f4 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -478,6 +478,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
location,
table_partition_cols,
if_not_exists,
+ file_compression_type,
} = statement;
// semantic checks
@@ -487,6 +488,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
))?;
}
+ if file_type != "CSV" && file_type != "JSON" &&
!file_compression_type.is_empty()
+ {
+ Err(DataFusionError::Plan(
+ "File compression type can be specified for CSV/JSON
files.".into(),
+ ))?;
+ }
+
let schema = self.build_schema(columns)?;
Ok(LogicalPlan::CreateExternalTable(PlanCreateExternalTable {
@@ -499,6 +507,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
table_partition_cols,
if_not_exists,
definition,
+ file_compression_type,
}))
}
@@ -3998,6 +4007,36 @@ mod tests {
quick_test(sql, expected);
}
+ #[test]
+ fn create_external_table_with_compression_type() {
+ // positive case
+ let sqls = vec![
+ "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE
GZIP LOCATION 'foo.csv.gz'",
+ "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE
BZIP2 LOCATION 'foo.csv.bz2'",
+ "CREATE EXTERNAL TABLE t(c1 int) STORED AS JSON COMPRESSION TYPE
GZIP LOCATION 'foo.json.gz'",
+ "CREATE EXTERNAL TABLE t(c1 int) STORED AS JSON COMPRESSION TYPE
BZIP2 LOCATION 'foo.json.bz2'",
+ ];
+ for sql in sqls {
+ let expected = "CreateExternalTable: \"t\"";
+ quick_test(sql, expected);
+ }
+
+ // negative case
+ let sqls = vec![
+ "CREATE EXTERNAL TABLE t STORED AS AVRO COMPRESSION TYPE GZIP
LOCATION 'foo.avro'",
+ "CREATE EXTERNAL TABLE t STORED AS AVRO COMPRESSION TYPE BZIP2
LOCATION 'foo.avro'",
+ "CREATE EXTERNAL TABLE t STORED AS PARQUET COMPRESSION TYPE GZIP
LOCATION 'foo.parquet'",
+ "CREATE EXTERNAL TABLE t STORED AS PARQUET COMPRESSION TYPE BZIP2
LOCATION 'foo.parquet'",
+ ];
+ for sql in sqls {
+ let err = logical_plan(sql).expect_err("query should have failed");
+ assert_eq!(
+ "Plan(\"File compression type can be specified for CSV/JSON
files.\")",
+ format!("{:?}", err)
+ );
+ }
+ }
+
#[test]
fn create_external_table_parquet() {
let sql =