tustvold commented on code in PR #7282:
URL: https://github.com/apache/arrow-datafusion/pull/7282#discussion_r1299178678


##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -504,57 +447,41 @@ impl FileOpener for CsvOpener {
                     } else {
                         0
                     };
-                    (start + start_delta, end + end_delta)
+                    let range = start + start_delta..end + end_delta;
+                    if range.start == range.end {
+                        return Ok(
+                            futures::stream::poll_fn(move |_| 
Poll::Ready(None)).boxed()
+                        );
+                    }
+                    Some(range)
                 }
             };
 
-            // For special case: If `Range` has equal `start` and `end`, 
object store will fetch
-            // the whole file
-            let localfs: Arc<dyn ObjectStore> = 
Arc::new(LocalFileSystem::new());
-            let is_localfs = localfs.type_id() == 
config.object_store.type_id();
-            if start_byte == end_byte && !is_localfs {
-                return Ok(futures::stream::poll_fn(move |_| 
Poll::Ready(None)).boxed());
-            }
-
             let options = GetOptions {
-                range: Some(Range {
-                    start: start_byte,
-                    end: end_byte,
-                }),
+                range,
                 ..Default::default()
             };
-
-            match config
+            let result = config
                 .object_store
                 .get_opts(file_meta.location(), options)
-                .await?
-            {
-                GetResult::File(file, _) => {
+                .await?;
+
+            match result.payload {
+                GetResultPayload::File(mut file, _) => {
                     let is_whole_file_scanned = file_meta.range.is_none();
                     let decoder = if is_whole_file_scanned {
-                        // For special case: `get_range()` will interpret 
`start` and `end` as the
-                        // byte range after decompression for compressed files
+                        // Don't seek if no range as breaks FIFO files

Review Comment:
   I think the question is perhaps more whether the streaming operators should 
be operators in their own right, instead of both streaming and non-streaming 
use-cases using CsvExec. Perhaps we could introduce a FileStreamExec or 
something? Both could still make use of object_store and arrow-csv under the 
hood, but separating them would perhaps better accommodate divergent 
functionality like schema inference, parallel reads, late materialisation, 
etc... that doesn't work in the same way for streams?
   
   I dunno, just spitballing here, it seems unfortunate to force a lowest 
common denominator on CsvExec, where it can't read byte ranges from files...
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to