alamb commented on code in PR #7282:
URL: https://github.com/apache/arrow-datafusion/pull/7282#discussion_r1294449058
##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -504,57 +447,41 @@ impl FileOpener for CsvOpener {
} else {
0
};
- (start + start_delta, end + end_delta)
+ let range = start + start_delta..end + end_delta;
+ if range.start == range.end {
+ return Ok(
+ futures::stream::poll_fn(move |_|
Poll::Ready(None)).boxed()
+ );
+ }
+ Some(range)
}
};
- // For special case: If `Range` has equal `start` and `end`,
object store will fetch
- // the whole file
- let localfs: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new());
- let is_localfs = localfs.type_id() ==
config.object_store.type_id();
- if start_byte == end_byte && !is_localfs {
- return Ok(futures::stream::poll_fn(move |_|
Poll::Ready(None)).boxed());
- }
-
let options = GetOptions {
- range: Some(Range {
- start: start_byte,
- end: end_byte,
- }),
+ range,
..Default::default()
};
-
- match config
+ let result = config
.object_store
.get_opts(file_meta.location(), options)
- .await?
- {
- GetResult::File(file, _) => {
+ .await?;
+
+ match result.payload {
+ GetResultPayload::File(mut file, _) => {
let is_whole_file_scanned = file_meta.range.is_none();
let decoder = if is_whole_file_scanned {
- // For special case: `get_range()` will interpret
`start` and `end` as the
- // byte range after decompression for compressed files
+ // Don't seek if no range as breaks FIFO files
Review Comment:
FYI @metesynnada -- I wonder if you have thoughts about moving FIFO support
into a more separated boundary -- I wonder if we could make a special interface
that handles incremental streaming somehow, and then implement FIFO support for
that interface 🤔
##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -504,57 +447,41 @@ impl FileOpener for CsvOpener {
} else {
0
};
- (start + start_delta, end + end_delta)
+ let range = start + start_delta..end + end_delta;
+ if range.start == range.end {
+ return Ok(
+ futures::stream::poll_fn(move |_|
Poll::Ready(None)).boxed()
+ );
+ }
+ Some(range)
}
};
- // For special case: If `Range` has equal `start` and `end`,
object store will fetch
- // the whole file
- let localfs: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new());
- let is_localfs = localfs.type_id() ==
config.object_store.type_id();
- if start_byte == end_byte && !is_localfs {
- return Ok(futures::stream::poll_fn(move |_|
Poll::Ready(None)).boxed());
- }
-
let options = GetOptions {
- range: Some(Range {
- start: start_byte,
- end: end_byte,
- }),
+ range,
..Default::default()
};
-
- match config
+ let result = config
.object_store
.get_opts(file_meta.location(), options)
- .await?
- {
- GetResult::File(file, _) => {
+ .await?;
+
+ match result.payload {
+ GetResultPayload::File(mut file, _) => {
let is_whole_file_scanned = file_meta.range.is_none();
let decoder = if is_whole_file_scanned {
- // For special case: `get_range()` will interpret
`start` and `end` as the
- // byte range after decompression for compressed files
+ // Don't seek if no range as breaks FIFO files
Review Comment:
FYI @metesynnada -- I wonder if you have thoughts about moving FIFO support
into a more separated boundary -- I wonder if we could make a special interface
that handles incremental streaming somehow, and then implement FIFO support for
that interface 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]