This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 155d0a9aee when inferring the schema of compressed CSV, decompress 
before newline-delimited chunking (#5860)
155d0a9aee is described below

commit 155d0a9aeefb4ae8b01ce5dbe340230a36d3ece6
Author: zhenxing jiang <[email protected]>
AuthorDate: Wed Apr 12 05:05:26 2023 +0800

    when inferring the schema of compressed CSV, decompress before 
newline-delimited chunking (#5860)
---
 datafusion-examples/examples/csv_sql.rs            |  18 ++
 datafusion/core/src/datasource/file_format/csv.rs  | 187 ++++++++++++++++++---
 .../core/src/datasource/file_format/file_type.rs   |  91 ++++++----
 .../core/src/physical_plan/file_format/csv.rs      |   3 +-
 .../core/src/physical_plan/file_format/json.rs     |   3 +-
 testing                                            |   2 +-
 6 files changed, 239 insertions(+), 65 deletions(-)

diff --git a/datafusion-examples/examples/csv_sql.rs 
b/datafusion-examples/examples/csv_sql.rs
index ce602e0e48..c883a2076d 100644
--- a/datafusion-examples/examples/csv_sql.rs
+++ b/datafusion-examples/examples/csv_sql.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::datasource::file_format::file_type::FileCompressionType;
 use datafusion::error::Result;
 use datafusion::prelude::*;
 
@@ -48,5 +49,22 @@ async fn main() -> Result<()> {
     // print the results
     df.show().await?;
 
+    // query compressed CSV with specific options
+    let csv_options = CsvReadOptions::default()
+        .has_header(true)
+        .file_compression_type(FileCompressionType::GZIP)
+        .file_extension("csv.gz");
+    let df = ctx
+        .read_csv(
+            &format!("{testdata}/csv/aggregate_test_100.csv.gz"),
+            csv_options,
+        )
+        .await?;
+    let df = df
+        .filter(col("c1").eq(lit("a")))?
+        .select_columns(&["c2", "c3"])?;
+
+    df.show().await?;
+
     Ok(())
 }
diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index b3969bad90..ff64979b5c 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -30,6 +30,7 @@ use bytes::{Buf, Bytes};
 use datafusion_common::DataFusionError;
 
 use datafusion_physical_expr::PhysicalExpr;
+use futures::stream::BoxStream;
 use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
 use object_store::{delimited::newline_delimited_stream, ObjectMeta, 
ObjectStore};
 
@@ -65,6 +66,62 @@ impl Default for CsvFormat {
 }
 
 impl CsvFormat {
+    /// Return a newline delimited stream from the specified file on
+    /// Stream, decompressing if necessary
+    /// Each returned `Bytes` has a whole number of newline delimited rows
+    async fn read_to_delimited_chunks(
+        &self,
+        store: &Arc<dyn ObjectStore>,
+        object: &ObjectMeta,
+    ) -> BoxStream<'static, Result<Bytes>> {
+        // stream to only read as many rows as needed into memory
+        let stream = store
+            .get(&object.location)
+            .await
+            .map_err(DataFusionError::ObjectStore);
+        let stream = match stream {
+            Ok(stream) => self
+                .read_to_delimited_chunks_from_stream(
+                    stream
+                        .into_stream()
+                        .map_err(DataFusionError::ObjectStore)
+                        .boxed(),
+                )
+                .await
+                .map_err(DataFusionError::from)
+                .left_stream(),
+            Err(e) => {
+                
futures::stream::once(futures::future::ready(Err(e))).right_stream()
+            }
+        };
+        stream.boxed()
+    }
+
+    async fn read_to_delimited_chunks_from_stream(
+        &self,
+        stream: BoxStream<'static, Result<Bytes>>,
+    ) -> BoxStream<'static, Result<Bytes>> {
+        let file_compression_type = self.file_compression_type.to_owned();
+        let decoder = file_compression_type.convert_stream(stream);
+        let steam = match decoder {
+            Ok(decoded_stream) => {
+                newline_delimited_stream(decoded_stream.map_err(|e| match e {
+                    DataFusionError::ObjectStore(e) => e,
+                    err => object_store::Error::Generic {
+                        store: "read to delimited chunks failed",
+                        source: Box::new(err),
+                    },
+                }))
+                .map_err(DataFusionError::from)
+                .left_stream()
+            }
+            Err(e) => {
+                
futures::stream::once(futures::future::ready(Err(e))).right_stream()
+            }
+        };
+        steam.boxed()
+    }
+
     /// Set a limit in terms of records to scan to infer the schema
     /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
     pub fn with_schema_infer_max_rec(mut self, max_rec: Option<usize>) -> Self 
{
@@ -124,8 +181,7 @@ impl FileFormat for CsvFormat {
         let mut records_to_read = 
self.schema_infer_max_rec.unwrap_or(usize::MAX);
 
         for object in objects {
-            // stream to only read as many rows as needed into memory
-            let stream = read_to_delimited_chunks(store, object).await;
+            let stream = self.read_to_delimited_chunks(store, object).await;
             let (schema, records_read) = self
                 .infer_schema_from_stream(records_to_read, stream)
                 .await?;
@@ -166,28 +222,6 @@ impl FileFormat for CsvFormat {
     }
 }
 
-/// Return a newline delimited stream from the specified file on
-/// object store
-///
-/// Each returned `Bytes` has a whole number of newline delimited rows
-async fn read_to_delimited_chunks(
-    store: &Arc<dyn ObjectStore>,
-    object: &ObjectMeta,
-) -> impl Stream<Item = Result<Bytes>> {
-    // stream to only read as many rows as needed into memory
-    let stream = store
-        .get(&object.location)
-        .await
-        .map_err(DataFusionError::ObjectStore);
-
-    match stream {
-        Ok(s) => newline_delimited_stream(s.into_stream())
-            .map_err(|e| DataFusionError::External(Box::new(e)))
-            .left_stream(),
-        Err(e) => 
futures::stream::once(futures::future::ready(Err(e))).right_stream(),
-    }
-}
-
 impl CsvFormat {
     /// Return the inferred schema reading up to records_to_read from a
     /// stream of delimited chunks returning the inferred schema and the
@@ -207,7 +241,7 @@ impl CsvFormat {
         while let Some(chunk) = stream.next().await.transpose()? {
             let (Schema { fields, .. }, records_read) =
                 arrow::csv::reader::infer_reader_schema(
-                    self.file_compression_type.convert_read(chunk.reader())?,
+                    chunk.reader(),
                     self.delimiter,
                     Some(records_to_read),
                     // only consider header for first chunk
@@ -295,14 +329,19 @@ fn build_schema_helper(names: Vec<String>, types: 
&[HashSet<DataType>]) -> Schem
 mod tests {
     use super::super::test_util::scan_format;
     use super::*;
+    use crate::assert_batches_eq;
     use crate::datasource::file_format::test_util::VariableStream;
     use crate::physical_plan::collect;
-    use crate::prelude::{SessionConfig, SessionContext};
+    use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
+    use crate::test_util::arrow_test_data;
     use bytes::Bytes;
     use chrono::DateTime;
     use datafusion_common::cast::as_string_array;
+    use datafusion_expr::{col, lit};
     use futures::StreamExt;
+    use object_store::local::LocalFileSystem;
     use object_store::path::Path;
+    use rstest::*;
 
     #[tokio::test]
     async fn read_small_batches() -> Result<()> {
@@ -461,6 +500,102 @@ mod tests {
         Ok(())
     }
 
+    #[rstest(
+        file_compression_type,
+        case(FileCompressionType::UNCOMPRESSED),
+        case(FileCompressionType::GZIP),
+        case(FileCompressionType::BZIP2),
+        case(FileCompressionType::XZ),
+        case(FileCompressionType::ZSTD)
+    )]
+    #[tokio::test]
+    async fn query_compress_data(
+        file_compression_type: FileCompressionType,
+    ) -> Result<()> {
+        let integration = 
LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();
+
+        let path = Path::from("csv/aggregate_test_100.csv");
+        let csv = CsvFormat::default().with_has_header(true);
+        let records_to_read = csv.schema_infer_max_rec.unwrap_or(usize::MAX);
+        let store = Arc::new(integration) as Arc<dyn ObjectStore>;
+        let original_stream = store.get(&path).await?;
+
+        //convert original_stream to compressed_stream for next step
+        let compressed_stream =
+            file_compression_type.to_owned().convert_to_compress_stream(
+                original_stream
+                    .into_stream()
+                    .map_err(DataFusionError::from)
+                    .boxed(),
+            );
+
+        //prepare expected schema for assert_eq
+        let expected = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int64, true),
+            Field::new("c3", DataType::Int64, true),
+            Field::new("c4", DataType::Int64, true),
+            Field::new("c5", DataType::Int64, true),
+            Field::new("c6", DataType::Int64, true),
+            Field::new("c7", DataType::Int64, true),
+            Field::new("c8", DataType::Int64, true),
+            Field::new("c9", DataType::Int64, true),
+            Field::new("c10", DataType::Int64, true),
+            Field::new("c11", DataType::Float64, true),
+            Field::new("c12", DataType::Float64, true),
+            Field::new("c13", DataType::Utf8, true),
+        ]);
+
+        let compressed_csv =
+            csv.with_file_compression_type(file_compression_type.clone());
+
+        //convert compressed_stream to decoded_stream
+        let decoded_stream = compressed_csv
+            .read_to_delimited_chunks_from_stream(compressed_stream.unwrap())
+            .await;
+        let (schema, records_read) = compressed_csv
+            .infer_schema_from_stream(records_to_read, decoded_stream)
+            .await?;
+
+        assert_eq!(expected, schema);
+        assert_eq!(100, records_read);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn query_compress_csv() -> Result<()> {
+        let ctx = SessionContext::new();
+
+        let csv_options = CsvReadOptions::default()
+            .has_header(true)
+            .file_compression_type(FileCompressionType::GZIP)
+            .file_extension("csv.gz");
+        let df = ctx
+            .read_csv(
+                &format!("{}/csv/aggregate_test_100.csv.gz", 
arrow_test_data()),
+                csv_options,
+            )
+            .await?;
+
+        let record_batch = df
+            .filter(col("c1").eq(lit("a")).and(col("c2").gt(lit("4"))))?
+            .select_columns(&["c2", "c3"])?
+            .collect()
+            .await?;
+        #[rustfmt::skip]
+            let expected = vec![
+            "+----+------+",
+            "| c2 | c3   |",
+            "+----+------+",
+            "| 5  | 36   |",
+            "| 5  | -31  |",
+            "| 5  | -101 |",
+            "+----+------+",
+        ];
+        assert_batches_eq!(expected, &record_batch);
+        Ok(())
+    }
+
     async fn get_exec(
         state: &SessionState,
         file_name: &str,
diff --git a/datafusion/core/src/datasource/file_format/file_type.rs 
b/datafusion/core/src/datasource/file_format/file_type.rs
index e07eb8a3d7..e72b1e579e 100644
--- a/datafusion/core/src/datasource/file_format/file_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_type.rs
@@ -25,16 +25,21 @@ use 
crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
 use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
 #[cfg(feature = "compression")]
 use async_compression::tokio::bufread::{
-    BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder,
-    XzDecoder as AsyncXzDecoder, ZstdDecoder as AsyncZstdDecoer,
+    BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
+    GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder,
+    XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder,
+    ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder,
 };
+
 use bytes::Bytes;
 #[cfg(feature = "compression")]
 use bzip2::read::MultiBzDecoder;
 use datafusion_common::parsers::CompressionTypeVariant;
 #[cfg(feature = "compression")]
 use flate2::read::MultiGzDecoder;
-use futures::Stream;
+
+use futures::stream::BoxStream;
+use futures::StreamExt;
 #[cfg(feature = "compression")]
 use futures::TryStreamExt;
 use std::str::FromStr;
@@ -111,53 +116,67 @@ impl FileCompressionType {
         self.variant.is_compressed()
     }
 
-    /// Given a `Stream`, create a `Stream` which data are decompressed with 
`FileCompressionType`.
-    pub fn convert_stream<T: Stream<Item = Result<Bytes>> + Unpin + Send + 
'static>(
+    /// Given a `Stream`, create a `Stream` which data are compressed with 
`FileCompressionType`.
+    pub fn convert_to_compress_stream(
         &self,
-        s: T,
-    ) -> Result<Box<dyn Stream<Item = Result<Bytes>> + Send + Unpin>> {
-        #[cfg(feature = "compression")]
-        let err_converter = |e: std::io::Error| match e
-            .get_ref()
-            .and_then(|e| e.downcast_ref::<DataFusionError>())
-        {
-            Some(_) => {
-                *(e.into_inner()
-                    .unwrap()
-                    .downcast::<DataFusionError>()
-                    .unwrap())
+        s: BoxStream<'static, Result<Bytes>>,
+    ) -> Result<BoxStream<'static, Result<Bytes>>> {
+        Ok(match self.variant {
+            #[cfg(feature = "compression")]
+            GZIP => 
ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
+                .map_err(DataFusionError::from)
+                .boxed(),
+            #[cfg(feature = "compression")]
+            BZIP2 => 
ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s)))
+                .map_err(DataFusionError::from)
+                .boxed(),
+            #[cfg(feature = "compression")]
+            XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s)))
+                .map_err(DataFusionError::from)
+                .boxed(),
+            #[cfg(feature = "compression")]
+            ZSTD => 
ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s)))
+                .map_err(DataFusionError::from)
+                .boxed(),
+            #[cfg(not(feature = "compression"))]
+            GZIP | BZIP2 | XZ | ZSTD => {
+                return Err(DataFusionError::NotImplemented(
+                    "Compression feature is not enabled".to_owned(),
+                ))
             }
-            None => Into::<DataFusionError>::into(e),
-        };
+            UNCOMPRESSED => s.boxed(),
+        })
+    }
 
+    /// Given a `Stream`, create a `Stream` which data are decompressed with 
`FileCompressionType`.
+    pub fn convert_stream(
+        &self,
+        s: BoxStream<'static, Result<Bytes>>,
+    ) -> Result<BoxStream<'static, Result<Bytes>>> {
         Ok(match self.variant {
             #[cfg(feature = "compression")]
-            GZIP => Box::new(
-                ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
-                    .map_err(err_converter),
-            ),
+            GZIP => 
ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
+                .map_err(DataFusionError::from)
+                .boxed(),
             #[cfg(feature = "compression")]
-            BZIP2 => Box::new(
-                ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
-                    .map_err(err_converter),
-            ),
+            BZIP2 => 
ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
+                .map_err(DataFusionError::from)
+                .boxed(),
             #[cfg(feature = "compression")]
-            XZ => Box::new(
-                ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
-                    .map_err(err_converter),
-            ),
+            XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
+                .map_err(DataFusionError::from)
+                .boxed(),
             #[cfg(feature = "compression")]
-            ZSTD => Box::new(
-                ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s)))
-                    .map_err(err_converter),
-            ),
+            ZSTD => 
ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s)))
+                .map_err(DataFusionError::from)
+                .boxed(),
             #[cfg(not(feature = "compression"))]
             GZIP | BZIP2 | XZ | ZSTD => {
                 return Err(DataFusionError::NotImplemented(
                     "Compression feature is not enabled".to_owned(),
                 ))
             }
-            UNCOMPRESSED => Box::new(s),
+            UNCOMPRESSED => s.boxed(),
         })
     }
 
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs 
b/datafusion/core/src/physical_plan/file_format/csv.rs
index d9075c84ad..9826d32d0f 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -246,7 +246,8 @@ impl FileOpener for CsvOpener {
                 GetResult::Stream(s) => {
                     let mut decoder = config.builder().build_decoder();
                     let s = s.map_err(DataFusionError::from);
-                    let mut input = 
file_compression_type.convert_stream(s)?.fuse();
+                    let mut input =
+                        
file_compression_type.convert_stream(s.boxed())?.fuse();
                     let mut buffered = Bytes::new();
 
                     let s = futures::stream::poll_fn(move |cx| {
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs 
b/datafusion/core/src/physical_plan/file_format/json.rs
index 379833bf4c..fb34175b85 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -191,7 +191,8 @@ impl FileOpener for JsonOpener {
                         .build_decoder()?;
 
                     let s = s.map_err(DataFusionError::from);
-                    let mut input = 
file_compression_type.convert_stream(s)?.fuse();
+                    let mut input =
+                        
file_compression_type.convert_stream(s.boxed())?.fuse();
                     let mut buffered = Bytes::new();
 
                     let s = stream::poll_fn(move |cx| {
diff --git a/testing b/testing
index 5bab2f264a..47f7b56b25 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88
+Subproject commit 47f7b56b25683202c1fd957668e13f2abafc0f12

Reply via email to