This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 155d0a9aee when inferring the schema of compressed CSV, decompress
before newline-delimited chunking (#5860)
155d0a9aee is described below
commit 155d0a9aeefb4ae8b01ce5dbe340230a36d3ece6
Author: zhenxing jiang <[email protected]>
AuthorDate: Wed Apr 12 05:05:26 2023 +0800
when inferring the schema of compressed CSV, decompress before
newline-delimited chunking (#5860)
---
datafusion-examples/examples/csv_sql.rs | 18 ++
datafusion/core/src/datasource/file_format/csv.rs | 187 ++++++++++++++++++---
.../core/src/datasource/file_format/file_type.rs | 91 ++++++----
.../core/src/physical_plan/file_format/csv.rs | 3 +-
.../core/src/physical_plan/file_format/json.rs | 3 +-
testing | 2 +-
6 files changed, 239 insertions(+), 65 deletions(-)
diff --git a/datafusion-examples/examples/csv_sql.rs
b/datafusion-examples/examples/csv_sql.rs
index ce602e0e48..c883a2076d 100644
--- a/datafusion-examples/examples/csv_sql.rs
+++ b/datafusion-examples/examples/csv_sql.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::datasource::file_format::file_type::FileCompressionType;
use datafusion::error::Result;
use datafusion::prelude::*;
@@ -48,5 +49,22 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;
+ // query compressed CSV with specific options
+ let csv_options = CsvReadOptions::default()
+ .has_header(true)
+ .file_compression_type(FileCompressionType::GZIP)
+ .file_extension("csv.gz");
+ let df = ctx
+ .read_csv(
+ &format!("{testdata}/csv/aggregate_test_100.csv.gz"),
+ csv_options,
+ )
+ .await?;
+ let df = df
+ .filter(col("c1").eq(lit("a")))?
+ .select_columns(&["c2", "c3"])?;
+
+ df.show().await?;
+
Ok(())
}
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index b3969bad90..ff64979b5c 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -30,6 +30,7 @@ use bytes::{Buf, Bytes};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::PhysicalExpr;
+use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta,
ObjectStore};
@@ -65,6 +66,62 @@ impl Default for CsvFormat {
}
impl CsvFormat {
+ /// Return a newline delimited stream from the specified file on
+ /// Stream, decompressing if necessary
+ /// Each returned `Bytes` has a whole number of newline delimited rows
+ async fn read_to_delimited_chunks(
+ &self,
+ store: &Arc<dyn ObjectStore>,
+ object: &ObjectMeta,
+ ) -> BoxStream<'static, Result<Bytes>> {
+ // stream to only read as many rows as needed into memory
+ let stream = store
+ .get(&object.location)
+ .await
+ .map_err(DataFusionError::ObjectStore);
+ let stream = match stream {
+ Ok(stream) => self
+ .read_to_delimited_chunks_from_stream(
+ stream
+ .into_stream()
+ .map_err(DataFusionError::ObjectStore)
+ .boxed(),
+ )
+ .await
+ .map_err(DataFusionError::from)
+ .left_stream(),
+ Err(e) => {
+
futures::stream::once(futures::future::ready(Err(e))).right_stream()
+ }
+ };
+ stream.boxed()
+ }
+
+ async fn read_to_delimited_chunks_from_stream(
+ &self,
+ stream: BoxStream<'static, Result<Bytes>>,
+ ) -> BoxStream<'static, Result<Bytes>> {
+ let file_compression_type = self.file_compression_type.to_owned();
+ let decoder = file_compression_type.convert_stream(stream);
+ let steam = match decoder {
+ Ok(decoded_stream) => {
+ newline_delimited_stream(decoded_stream.map_err(|e| match e {
+ DataFusionError::ObjectStore(e) => e,
+ err => object_store::Error::Generic {
+ store: "read to delimited chunks failed",
+ source: Box::new(err),
+ },
+ }))
+ .map_err(DataFusionError::from)
+ .left_stream()
+ }
+ Err(e) => {
+
futures::stream::once(futures::future::ready(Err(e))).right_stream()
+ }
+ };
+ steam.boxed()
+ }
+
/// Set a limit in terms of records to scan to infer the schema
/// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
pub fn with_schema_infer_max_rec(mut self, max_rec: Option<usize>) -> Self
{
@@ -124,8 +181,7 @@ impl FileFormat for CsvFormat {
let mut records_to_read =
self.schema_infer_max_rec.unwrap_or(usize::MAX);
for object in objects {
- // stream to only read as many rows as needed into memory
- let stream = read_to_delimited_chunks(store, object).await;
+ let stream = self.read_to_delimited_chunks(store, object).await;
let (schema, records_read) = self
.infer_schema_from_stream(records_to_read, stream)
.await?;
@@ -166,28 +222,6 @@ impl FileFormat for CsvFormat {
}
}
-/// Return a newline delimited stream from the specified file on
-/// object store
-///
-/// Each returned `Bytes` has a whole number of newline delimited rows
-async fn read_to_delimited_chunks(
- store: &Arc<dyn ObjectStore>,
- object: &ObjectMeta,
-) -> impl Stream<Item = Result<Bytes>> {
- // stream to only read as many rows as needed into memory
- let stream = store
- .get(&object.location)
- .await
- .map_err(DataFusionError::ObjectStore);
-
- match stream {
- Ok(s) => newline_delimited_stream(s.into_stream())
- .map_err(|e| DataFusionError::External(Box::new(e)))
- .left_stream(),
- Err(e) =>
futures::stream::once(futures::future::ready(Err(e))).right_stream(),
- }
-}
-
impl CsvFormat {
/// Return the inferred schema reading up to records_to_read from a
/// stream of delimited chunks returning the inferred schema and the
@@ -207,7 +241,7 @@ impl CsvFormat {
while let Some(chunk) = stream.next().await.transpose()? {
let (Schema { fields, .. }, records_read) =
arrow::csv::reader::infer_reader_schema(
- self.file_compression_type.convert_read(chunk.reader())?,
+ chunk.reader(),
self.delimiter,
Some(records_to_read),
// only consider header for first chunk
@@ -295,14 +329,19 @@ fn build_schema_helper(names: Vec<String>, types:
&[HashSet<DataType>]) -> Schem
mod tests {
use super::super::test_util::scan_format;
use super::*;
+ use crate::assert_batches_eq;
use crate::datasource::file_format::test_util::VariableStream;
use crate::physical_plan::collect;
- use crate::prelude::{SessionConfig, SessionContext};
+ use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
+ use crate::test_util::arrow_test_data;
use bytes::Bytes;
use chrono::DateTime;
use datafusion_common::cast::as_string_array;
+ use datafusion_expr::{col, lit};
use futures::StreamExt;
+ use object_store::local::LocalFileSystem;
use object_store::path::Path;
+ use rstest::*;
#[tokio::test]
async fn read_small_batches() -> Result<()> {
@@ -461,6 +500,102 @@ mod tests {
Ok(())
}
+ #[rstest(
+ file_compression_type,
+ case(FileCompressionType::UNCOMPRESSED),
+ case(FileCompressionType::GZIP),
+ case(FileCompressionType::BZIP2),
+ case(FileCompressionType::XZ),
+ case(FileCompressionType::ZSTD)
+ )]
+ #[tokio::test]
+ async fn query_compress_data(
+ file_compression_type: FileCompressionType,
+ ) -> Result<()> {
+ let integration =
LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();
+
+ let path = Path::from("csv/aggregate_test_100.csv");
+ let csv = CsvFormat::default().with_has_header(true);
+ let records_to_read = csv.schema_infer_max_rec.unwrap_or(usize::MAX);
+ let store = Arc::new(integration) as Arc<dyn ObjectStore>;
+ let original_stream = store.get(&path).await?;
+
+ //convert original_stream to compressed_stream for next step
+ let compressed_stream =
+ file_compression_type.to_owned().convert_to_compress_stream(
+ original_stream
+ .into_stream()
+ .map_err(DataFusionError::from)
+ .boxed(),
+ );
+
+ //prepare expected schema for assert_eq
+ let expected = Schema::new(vec![
+ Field::new("c1", DataType::Utf8, true),
+ Field::new("c2", DataType::Int64, true),
+ Field::new("c3", DataType::Int64, true),
+ Field::new("c4", DataType::Int64, true),
+ Field::new("c5", DataType::Int64, true),
+ Field::new("c6", DataType::Int64, true),
+ Field::new("c7", DataType::Int64, true),
+ Field::new("c8", DataType::Int64, true),
+ Field::new("c9", DataType::Int64, true),
+ Field::new("c10", DataType::Int64, true),
+ Field::new("c11", DataType::Float64, true),
+ Field::new("c12", DataType::Float64, true),
+ Field::new("c13", DataType::Utf8, true),
+ ]);
+
+ let compressed_csv =
+ csv.with_file_compression_type(file_compression_type.clone());
+
+ //convert compressed_stream to decoded_stream
+ let decoded_stream = compressed_csv
+ .read_to_delimited_chunks_from_stream(compressed_stream.unwrap())
+ .await;
+ let (schema, records_read) = compressed_csv
+ .infer_schema_from_stream(records_to_read, decoded_stream)
+ .await?;
+
+ assert_eq!(expected, schema);
+ assert_eq!(100, records_read);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn query_compress_csv() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let csv_options = CsvReadOptions::default()
+ .has_header(true)
+ .file_compression_type(FileCompressionType::GZIP)
+ .file_extension("csv.gz");
+ let df = ctx
+ .read_csv(
+ &format!("{}/csv/aggregate_test_100.csv.gz",
arrow_test_data()),
+ csv_options,
+ )
+ .await?;
+
+ let record_batch = df
+ .filter(col("c1").eq(lit("a")).and(col("c2").gt(lit("4"))))?
+ .select_columns(&["c2", "c3"])?
+ .collect()
+ .await?;
+ #[rustfmt::skip]
+ let expected = vec![
+ "+----+------+",
+ "| c2 | c3 |",
+ "+----+------+",
+ "| 5 | 36 |",
+ "| 5 | -31 |",
+ "| 5 | -101 |",
+ "+----+------+",
+ ];
+ assert_batches_eq!(expected, &record_batch);
+ Ok(())
+ }
+
async fn get_exec(
state: &SessionState,
file_name: &str,
diff --git a/datafusion/core/src/datasource/file_format/file_type.rs
b/datafusion/core/src/datasource/file_format/file_type.rs
index e07eb8a3d7..e72b1e579e 100644
--- a/datafusion/core/src/datasource/file_format/file_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_type.rs
@@ -25,16 +25,21 @@ use
crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
#[cfg(feature = "compression")]
use async_compression::tokio::bufread::{
- BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder,
- XzDecoder as AsyncXzDecoder, ZstdDecoder as AsyncZstdDecoer,
+ BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
+ GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder,
+ XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder,
+ ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder,
};
+
use bytes::Bytes;
#[cfg(feature = "compression")]
use bzip2::read::MultiBzDecoder;
use datafusion_common::parsers::CompressionTypeVariant;
#[cfg(feature = "compression")]
use flate2::read::MultiGzDecoder;
-use futures::Stream;
+
+use futures::stream::BoxStream;
+use futures::StreamExt;
#[cfg(feature = "compression")]
use futures::TryStreamExt;
use std::str::FromStr;
@@ -111,53 +116,67 @@ impl FileCompressionType {
self.variant.is_compressed()
}
- /// Given a `Stream`, create a `Stream` which data are decompressed with
`FileCompressionType`.
- pub fn convert_stream<T: Stream<Item = Result<Bytes>> + Unpin + Send +
'static>(
+ /// Given a `Stream`, create a `Stream` which data are compressed with
`FileCompressionType`.
+ pub fn convert_to_compress_stream(
&self,
- s: T,
- ) -> Result<Box<dyn Stream<Item = Result<Bytes>> + Send + Unpin>> {
- #[cfg(feature = "compression")]
- let err_converter = |e: std::io::Error| match e
- .get_ref()
- .and_then(|e| e.downcast_ref::<DataFusionError>())
- {
- Some(_) => {
- *(e.into_inner()
- .unwrap()
- .downcast::<DataFusionError>()
- .unwrap())
+ s: BoxStream<'static, Result<Bytes>>,
+ ) -> Result<BoxStream<'static, Result<Bytes>>> {
+ Ok(match self.variant {
+ #[cfg(feature = "compression")]
+ GZIP =>
ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
+ .map_err(DataFusionError::from)
+ .boxed(),
+ #[cfg(feature = "compression")]
+ BZIP2 =>
ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s)))
+ .map_err(DataFusionError::from)
+ .boxed(),
+ #[cfg(feature = "compression")]
+ XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s)))
+ .map_err(DataFusionError::from)
+ .boxed(),
+ #[cfg(feature = "compression")]
+ ZSTD =>
ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s)))
+ .map_err(DataFusionError::from)
+ .boxed(),
+ #[cfg(not(feature = "compression"))]
+ GZIP | BZIP2 | XZ | ZSTD => {
+ return Err(DataFusionError::NotImplemented(
+ "Compression feature is not enabled".to_owned(),
+ ))
}
- None => Into::<DataFusionError>::into(e),
- };
+ UNCOMPRESSED => s.boxed(),
+ })
+ }
+ /// Given a `Stream`, create a `Stream` which data are decompressed with
`FileCompressionType`.
+ pub fn convert_stream(
+ &self,
+ s: BoxStream<'static, Result<Bytes>>,
+ ) -> Result<BoxStream<'static, Result<Bytes>>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
- GZIP => Box::new(
- ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
- .map_err(err_converter),
- ),
+ GZIP =>
ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
+ .map_err(DataFusionError::from)
+ .boxed(),
#[cfg(feature = "compression")]
- BZIP2 => Box::new(
- ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
- .map_err(err_converter),
- ),
+ BZIP2 =>
ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
+ .map_err(DataFusionError::from)
+ .boxed(),
#[cfg(feature = "compression")]
- XZ => Box::new(
- ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
- .map_err(err_converter),
- ),
+ XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
+ .map_err(DataFusionError::from)
+ .boxed(),
#[cfg(feature = "compression")]
- ZSTD => Box::new(
- ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s)))
- .map_err(err_converter),
- ),
+ ZSTD =>
ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s)))
+ .map_err(DataFusionError::from)
+ .boxed(),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
- UNCOMPRESSED => Box::new(s),
+ UNCOMPRESSED => s.boxed(),
})
}
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs
b/datafusion/core/src/physical_plan/file_format/csv.rs
index d9075c84ad..9826d32d0f 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -246,7 +246,8 @@ impl FileOpener for CsvOpener {
GetResult::Stream(s) => {
let mut decoder = config.builder().build_decoder();
let s = s.map_err(DataFusionError::from);
- let mut input =
file_compression_type.convert_stream(s)?.fuse();
+ let mut input =
+
file_compression_type.convert_stream(s.boxed())?.fuse();
let mut buffered = Bytes::new();
let s = futures::stream::poll_fn(move |cx| {
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs
b/datafusion/core/src/physical_plan/file_format/json.rs
index 379833bf4c..fb34175b85 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -191,7 +191,8 @@ impl FileOpener for JsonOpener {
.build_decoder()?;
let s = s.map_err(DataFusionError::from);
- let mut input =
file_compression_type.convert_stream(s)?.fuse();
+ let mut input =
+
file_compression_type.convert_stream(s.boxed())?.fuse();
let mut buffered = Bytes::new();
let s = stream::poll_fn(move |cx| {
diff --git a/testing b/testing
index 5bab2f264a..47f7b56b25 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88
+Subproject commit 47f7b56b25683202c1fd957668e13f2abafc0f12