This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 81d9d8869c refactor: rename FileStream.file_reader to file_opener &
update doc (#8883)
81d9d8869c is described below
commit 81d9d8869c16d878375faeefb1b99f8cbd323785
Author: SteveLauC <[email protected]>
AuthorDate: Thu Jan 18 04:26:38 2024 +0800
refactor: rename FileStream.file_reader to file_opener & update doc (#8883)
---
.../core/src/datasource/physical_plan/file_stream.rs | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index 3536623976..9cb58e7032 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -83,11 +83,9 @@ pub struct FileStream<F: FileOpener> {
projected_schema: SchemaRef,
/// The remaining number of records to parse, None if no limit
remain: Option<usize>,
- /// A closure that takes a reader and an optional remaining number of lines
- /// (before reaching the limit) and returns a batch iterator. If the file
reader
- /// is not capable of limiting the number of records in the last batch,
the file
- /// stream will take care of truncating it.
- file_reader: F,
+ /// A generic [`FileOpener`]. Calling `open()` returns a
[`FileOpenFuture`],
+ /// which can be resolved to a stream of `RecordBatch`.
+ file_opener: F,
/// The partition column projector
pc_projector: PartitionColumnProjector,
/// The stream state
@@ -250,7 +248,7 @@ impl<F: FileOpener> FileStream<F> {
pub fn new(
config: &FileScanConfig,
partition: usize,
- file_reader: F,
+ file_opener: F,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Self> {
let (projected_schema, ..) = config.project();
@@ -269,7 +267,7 @@ impl<F: FileOpener> FileStream<F> {
file_iter: files.into(),
projected_schema,
remain: config.limit,
- file_reader,
+ file_opener,
pc_projector,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
@@ -301,7 +299,7 @@ impl<F: FileOpener> FileStream<F> {
};
Some(
- self.file_reader
+ self.file_opener
.open(file_meta)
.map(|future| (future, part_file.partition_values)),
)