metesynnada commented on code in PR #7282:
URL: https://github.com/apache/arrow-datafusion/pull/7282#discussion_r1300234230
##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -504,57 +447,41 @@ impl FileOpener for CsvOpener {
} else {
0
};
- (start + start_delta, end + end_delta)
+ let range = start + start_delta..end + end_delta;
+ if range.start == range.end {
+ return Ok(
+ futures::stream::poll_fn(move |_|
Poll::Ready(None)).boxed()
+ );
+ }
+ Some(range)
}
};
- // For special case: If `Range` has equal `start` and `end`,
object store will fetch
- // the whole file
- let localfs: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new());
- let is_localfs = localfs.type_id() ==
config.object_store.type_id();
- if start_byte == end_byte && !is_localfs {
- return Ok(futures::stream::poll_fn(move |_|
Poll::Ready(None)).boxed());
- }
-
let options = GetOptions {
- range: Some(Range {
- start: start_byte,
- end: end_byte,
- }),
+ range,
..Default::default()
};
-
- match config
+ let result = config
.object_store
.get_opts(file_meta.location(), options)
- .await?
- {
- GetResult::File(file, _) => {
+ .await?;
+
+ match result.payload {
+ GetResultPayload::File(mut file, _) => {
let is_whole_file_scanned = file_meta.range.is_none();
let decoder = if is_whole_file_scanned {
- // For special case: `get_range()` will interpret
`start` and `end` as the
- // byte range after decompression for compressed files
+ // Don't seek if no range as breaks FIFO files
Review Comment:
@ozankabak and I agree that implementing `FileStreamExec` would be a
logical choice. We plan on developing a proof of concept for it next week and
sharing a design document.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]