martin-g commented on code in PR #18457:
URL: https://github.com/apache/datafusion/pull/18457#discussion_r2509737132
##########
datafusion/datasource-arrow/src/source.rs:
##########
@@ -129,13 +143,165 @@ impl FileSource for ArrowSource {
}
}
-/// The struct arrow that implements `[FileOpener]` trait
-pub struct ArrowOpener {
- pub object_store: Arc<dyn ObjectStore>,
- pub projection: Option<Vec<usize>>,
+/// `FileSource` for Arrow IPC stream format. Supports only sequential reading.
+#[derive(Clone)]
+pub(crate) struct ArrowStreamFileSource {
+ table_schema: TableSchema,
+ metrics: ExecutionPlanMetricsSet,
+ projected_statistics: Option<Statistics>,
+ schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}
-impl FileOpener for ArrowOpener {
+impl ArrowStreamFileSource {
+ /// Initialize an ArrowStreamFileSource with the provided schema
+ pub fn new(table_schema: impl Into<TableSchema>) -> Self {
+ Self {
+ table_schema: table_schema.into(),
+ metrics: ExecutionPlanMetricsSet::new(),
+ projected_statistics: None,
+ schema_adapter_factory: None,
+ }
+ }
+}
+
+impl From<ArrowStreamFileSource> for Arc<dyn FileSource> {
+ fn from(source: ArrowStreamFileSource) -> Self {
+ as_file_source(source)
+ }
+}
+
+impl FileSource for ArrowStreamFileSource {
+ fn create_file_opener(
+ &self,
+ object_store: Arc<dyn ObjectStore>,
+ base_config: &FileScanConfig,
+ _partition: usize,
+ ) -> Arc<dyn FileOpener> {
+ Arc::new(ArrowStreamFileOpener {
+ object_store,
+ projection: base_config.file_column_projection_indices(),
+ })
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
+ Arc::new(Self { ..self.clone() })
Review Comment:
```suggestion
Arc::new(self.clone())
```
##########
datafusion/datasource-arrow/src/file_format.rs:
##########
@@ -151,12 +153,18 @@ impl FileFormat for ArrowFormat {
let schema = match r.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(mut file, _) => {
- let reader = FileReader::try_new(&mut file, None)?;
- reader.schema()
- }
- GetResultPayload::Stream(stream) => {
- infer_schema_from_file_stream(stream).await?
+ match FileReader::try_new(&mut file, None) {
+ Ok(reader) => reader.schema(),
+ Err(_) => {
+ // not in the file format, but FileReader read
some bytes
+ // while trying to parse the file and so we need
to rewind
+ // it to the beginning of the file
+ file.seek(SeekFrom::Start(0))?;
+ StreamReader::try_new(&mut file, None)?.schema()
Review Comment:
It might be better to combine the errors from FileReader and StreamReader if
both fail. Currently only the error from StreamReader will be reported but it
might be that this is a plain file, not a stream file, and the real error is
discarded.
##########
datafusion/datasource-arrow/src/source.rs:
##########
@@ -129,13 +143,165 @@ impl FileSource for ArrowSource {
}
}
-/// The struct arrow that implements `[FileOpener]` trait
-pub struct ArrowOpener {
- pub object_store: Arc<dyn ObjectStore>,
- pub projection: Option<Vec<usize>>,
+/// `FileSource` for Arrow IPC stream format. Supports only sequential reading.
+#[derive(Clone)]
+pub(crate) struct ArrowStreamFileSource {
+ table_schema: TableSchema,
+ metrics: ExecutionPlanMetricsSet,
+ projected_statistics: Option<Statistics>,
+ schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}
-impl FileOpener for ArrowOpener {
+impl ArrowStreamFileSource {
+ /// Initialize an ArrowStreamFileSource with the provided schema
+ pub fn new(table_schema: impl Into<TableSchema>) -> Self {
+ Self {
+ table_schema: table_schema.into(),
+ metrics: ExecutionPlanMetricsSet::new(),
+ projected_statistics: None,
+ schema_adapter_factory: None,
+ }
+ }
+}
+
+impl From<ArrowStreamFileSource> for Arc<dyn FileSource> {
+ fn from(source: ArrowStreamFileSource) -> Self {
+ as_file_source(source)
+ }
+}
+
+impl FileSource for ArrowStreamFileSource {
+ fn create_file_opener(
+ &self,
+ object_store: Arc<dyn ObjectStore>,
+ base_config: &FileScanConfig,
+ _partition: usize,
+ ) -> Arc<dyn FileOpener> {
+ Arc::new(ArrowStreamFileOpener {
+ object_store,
+ projection: base_config.file_column_projection_indices(),
+ })
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
+ Arc::new(Self { ..self.clone() })
+ }
+
+ fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
+ let mut conf = self.clone();
+ conf.projected_statistics = Some(statistics);
+ Arc::new(conf)
+ }
+
+ fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource>
{
+ Arc::new(Self { ..self.clone() })
Review Comment:
```suggestion
Arc::new(self.clone())
```
##########
datafusion/datasource-arrow/src/source.rs:
##########
@@ -129,13 +143,165 @@ impl FileSource for ArrowSource {
}
}
-/// The struct arrow that implements `[FileOpener]` trait
-pub struct ArrowOpener {
- pub object_store: Arc<dyn ObjectStore>,
- pub projection: Option<Vec<usize>>,
+/// `FileSource` for Arrow IPC stream format. Supports only sequential reading.
+#[derive(Clone)]
+pub(crate) struct ArrowStreamFileSource {
+ table_schema: TableSchema,
+ metrics: ExecutionPlanMetricsSet,
+ projected_statistics: Option<Statistics>,
+ schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}
-impl FileOpener for ArrowOpener {
+impl ArrowStreamFileSource {
+ /// Initialize an ArrowStreamFileSource with the provided schema
+ pub fn new(table_schema: impl Into<TableSchema>) -> Self {
+ Self {
+ table_schema: table_schema.into(),
+ metrics: ExecutionPlanMetricsSet::new(),
+ projected_statistics: None,
+ schema_adapter_factory: None,
+ }
+ }
+}
+
+impl From<ArrowStreamFileSource> for Arc<dyn FileSource> {
+ fn from(source: ArrowStreamFileSource) -> Self {
+ as_file_source(source)
+ }
+}
+
+impl FileSource for ArrowStreamFileSource {
+ fn create_file_opener(
+ &self,
+ object_store: Arc<dyn ObjectStore>,
+ base_config: &FileScanConfig,
+ _partition: usize,
+ ) -> Arc<dyn FileOpener> {
+ Arc::new(ArrowStreamFileOpener {
+ object_store,
+ projection: base_config.file_column_projection_indices(),
+ })
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
+ Arc::new(Self { ..self.clone() })
+ }
+
+ fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
+ let mut conf = self.clone();
+ conf.projected_statistics = Some(statistics);
+ Arc::new(conf)
+ }
+
+ fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource>
{
+ Arc::new(Self { ..self.clone() })
+ }
+
+ fn repartitioned(
+ &self,
+ _target_partitions: usize,
+ _repartition_file_min_size: usize,
+ _output_ordering: Option<LexOrdering>,
+ _config: &FileScanConfig,
+ ) -> Result<Option<FileScanConfig>> {
+ // The Arrow IPC stream format doesn't support range-based parallel
reading
+ // because it lacks a footer with the information that would be needed
to
+ // make range-based parallel reading practical. Without the data in the
+ // footer you would either need to read the the entire file and record
the
+ // offsets of the record batches and dictionaries, essentially
recreating
+ // the footer's contents, or else each partition would need to read the
+ // entire file up to the correct offset which is a lot of duplicate
I/O.
+ // We're opting to avoid that entirely by only acting on a single
partition
+ // and reading sequentially.
+ Ok(None)
+ }
+
+ fn metrics(&self) -> &ExecutionPlanMetricsSet {
+ &self.metrics
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ let statistics = &self.projected_statistics;
+ Ok(statistics
+ .clone()
+ .expect("projected_statistics must be set"))
+ }
+
+ fn file_type(&self) -> &str {
+ "arrow_stream"
+ }
+
+ fn with_schema_adapter_factory(
+ &self,
+ schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+ ) -> Result<Arc<dyn FileSource>> {
+ Ok(Arc::new(Self {
+ schema_adapter_factory: Some(schema_adapter_factory),
+ ..self.clone()
+ }))
+ }
+
+ fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
+ self.schema_adapter_factory.clone()
+ }
+
+ fn table_schema(&self) -> &TableSchema {
+ &self.table_schema
+ }
+}
+
+/// `FileOpener` for Arrow IPC stream format. Supports only sequential reading.
+pub(crate) struct ArrowStreamFileOpener {
+ object_store: Arc<dyn ObjectStore>,
+ projection: Option<Vec<usize>>,
+}
+
+impl FileOpener for ArrowStreamFileOpener {
+ fn open(&self, partitioned_file: PartitionedFile) ->
Result<FileOpenFuture> {
+ if partitioned_file.range.is_some() {
+ return Err(exec_datafusion_err!(
+ "ArrowStreamOpener does not support range-based reading"
Review Comment:
```suggestion
"ArrowStreamFileOpener does not support range-based reading"
```
##########
datafusion/datasource-arrow/src/file_format.rs:
##########
@@ -176,14 +184,33 @@ impl FileFormat for ArrowFormat {
async fn create_physical_plan(
&self,
- _state: &dyn Session,
+ state: &dyn Session,
conf: FileScanConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let object_store =
state.runtime_env().object_store(&conf.object_store_url)?;
+ let object_location = &conf
+ .file_groups
+ .first()
+ .ok_or_else(|| internal_datafusion_err!("No files found in file
group"))?
+ .files()
+ .first()
Review Comment:
This makes the assumption that all file groups and all files in them are
always the same type - either all plain files or all are stream files.
Is this always the case ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]