corasaurus-hex commented on code in PR #18457:
URL: https://github.com/apache/datafusion/pull/18457#discussion_r2501558640
##########
datafusion/datasource-arrow/src/source.rs:
##########
@@ -116,13 +117,150 @@ impl FileSource for ArrowSource {
}
}
-/// The struct arrow that implements `[FileOpener]` trait
-pub struct ArrowOpener {
+/// Arrow IPC Stream format source - supports only sequential reading
+#[derive(Clone, Default)]
+pub struct ArrowStreamSource {
+ metrics: ExecutionPlanMetricsSet,
+ projected_statistics: Option<Statistics>,
+ schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
+}
+
+impl From<ArrowStreamSource> for Arc<dyn FileSource> {
+ fn from(source: ArrowStreamSource) -> Self {
+ as_file_source(source)
+ }
+}
+
+impl FileSource for ArrowStreamSource {
+ fn create_file_opener(
+ &self,
+ object_store: Arc<dyn ObjectStore>,
+ base_config: &FileScanConfig,
+ _partition: usize,
+ ) -> Arc<dyn FileOpener> {
+ Arc::new(ArrowStreamOpener {
+ object_store,
+ projection: base_config.file_column_projection_indices(),
+ })
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
+ Arc::new(Self { ..self.clone() })
+ }
+
+ fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
+ Arc::new(Self { ..self.clone() })
+ }
+
+ fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
+ let mut conf = self.clone();
+ conf.projected_statistics = Some(statistics);
+ Arc::new(conf)
+ }
+
+ fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource>
{
+ Arc::new(Self { ..self.clone() })
+ }
+
+ fn repartitioned(
+ &self,
+ _target_partitions: usize,
+ _repartition_file_min_size: usize,
+ _output_ordering:
Option<datafusion_physical_expr_common::sort_expr::LexOrdering>,
+ _config: &FileScanConfig,
+ ) -> Result<Option<FileScanConfig>> {
+ // Stream format doesn't support range-based parallel reading
+ // because it lacks a footer that would be needed to make range-based
+ // seeking practical. Without that, you would either need to read
+ // the entire file and index it up front before doing parallel reading
+ // or else each partition would need to read the entire file up to the
+ // correct offset which is a lot of duplicate I/O. We're opting to
avoid
+ // that entirely by only acting on a single partition and reading
sequentially.
+ Ok(None)
+ }
+
+ fn metrics(&self) -> &ExecutionPlanMetricsSet {
+ &self.metrics
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ let statistics = &self.projected_statistics;
+ Ok(statistics
+ .clone()
+ .expect("projected_statistics must be set"))
Review Comment:
This is done in
[`json`](https://github.com/corasaurus-hex/datafusion/blob/dfba22862da6cbd59537edee963f5bce55bd7aa2/datafusion/datasource-json/src/source.rs#L143-L148),
[`csv`](https://github.com/corasaurus-hex/datafusion/blob/dfba22862da6cbd59537edee963f5bce55bd7aa2/datafusion/datasource-csv/src/source.rs#L282-L287),
[`avro`](https://github.com/corasaurus-hex/datafusion/blob/dfba22862da6cbd59537edee963f5bce55bd7aa2/datafusion/datasource-avro/src/source.rs#L111-L116),
`[parquet`](https://github.com/corasaurus-hex/datafusion/blob/e4f6a144ac652f3eac4fe3dfebe3649c32fb19de/datafusion/datasource-parquet/src/source.rs#L624-L628),
and this file for the arrow file format. I don't understand the implications
other than it's used in every instance I can find and so I'm fine leaving it
for the moment and fixing it if we have a problem later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]