alamb commented on code in PR #5860:
URL: https://github.com/apache/arrow-datafusion/pull/5860#discussion_r1160795323


##########
testing:
##########


Review Comment:
   I verified this is a commit on master of testing:  👍  
https://github.com/apache/arrow-testing/commit/47f7b56b25683202c1fd957668e13f2abafc0f12



##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -65,6 +65,59 @@ impl Default for CsvFormat {
 }
 
 impl CsvFormat {
+    /// Return a newline delimited stream from the specified file on
+    /// Stream, decompressing if necessary
+    /// Each returned `Bytes` has a whole number of newline delimited rows
+    async fn read_to_delimited_chunks(

Review Comment:
   I feel like there must be a simpler way to express this code, but this does 
appear to work.
   
   I wonder if we could use `BoxStream` rather than `impl Stream....` 🤔 



##########
datafusion/core/src/datasource/file_format/file_type.rs:
##########
@@ -111,6 +115,58 @@ impl FileCompressionType {
         self.variant.is_compressed()
     }
 
+    /// Given a `Stream`, create a `Stream` which data are decompressed with 
`FileCompressionType`.
+    pub fn convert_to_compress_stream<
+        T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static,
+    >(
+        &self,
+        s: T,
+    ) -> Result<Box<dyn Stream<Item = Result<Bytes>> + Send + Unpin>> {
+        #[cfg(feature = "compression")]
+        let err_converter = |e: std::io::Error| match e
+            .get_ref()
+            .and_then(|e| e.downcast_ref::<DataFusionError>())
+        {
+            Some(_) => {
+                *(e.into_inner()
+                    .unwrap()
+                    .downcast::<DataFusionError>()
+                    .unwrap())
+            }
+            None => DataFusionError::from(e),
+        };
+
+        Ok(match self.variant {
+            #[cfg(feature = "compression")]
+            GZIP => Box::new(
+                ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
+                    .map_err(err_converter),

Review Comment:
   I am not sure why we need the `err_converer` -- this worked for me locally:
   
   ```suggestion
               GZIP => Box::new(
                   ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
                       .map_err(DataFusionError::from),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to