ozankabak commented on code in PR #7282:
URL: https://github.com/apache/arrow-datafusion/pull/7282#discussion_r1299227628
##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -504,57 +447,41 @@ impl FileOpener for CsvOpener {
} else {
0
};
- (start + start_delta, end + end_delta)
+ let range = start + start_delta..end + end_delta;
+ if range.start == range.end {
+ return Ok(
+ futures::stream::poll_fn(move |_|
Poll::Ready(None)).boxed()
+ );
+ }
+ Some(range)
}
};
- // For special case: If `Range` has equal `start` and `end`,
object store will fetch
- // the whole file
- let localfs: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new());
- let is_localfs = localfs.type_id() ==
config.object_store.type_id();
- if start_byte == end_byte && !is_localfs {
- return Ok(futures::stream::poll_fn(move |_|
Poll::Ready(None)).boxed());
- }
-
let options = GetOptions {
- range: Some(Range {
- start: start_byte,
- end: end_byte,
- }),
+ range,
..Default::default()
};
-
- match config
+ let result = config
.object_store
.get_opts(file_meta.location(), options)
- .await?
- {
- GetResult::File(file, _) => {
+ .await?;
+
+ match result.payload {
+ GetResultPayload::File(mut file, _) => {
let is_whole_file_scanned = file_meta.range.is_none();
let decoder = if is_whole_file_scanned {
- // For special case: `get_range()` will interpret
`start` and `end` as the
- // byte range after decompression for compressed files
+ // Don't seek if no range as breaks FIFO files
Review Comment:
I will think about the `FileStreamExec` idea and discuss with @metesynnada.
We might be coming to a point where taking such a step may make sense. We will
circle back once we have some clarity on our end.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]