AdamGS commented on code in PR #14951:
URL: https://github.com/apache/datafusion/pull/14951#discussion_r1979201859
##########
datafusion/core/src/datasource/file_format/avro.rs:
##########
@@ -15,163 +15,31 @@
// specific language governing permissions and limitations
// under the License.
-//! [`AvroFormat`] Apache Avro [`FileFormat`] abstractions
-
-use std::any::Any;
-use std::collections::HashMap;
-use std::fmt;
-use std::sync::Arc;
-
-use super::file_compression_type::FileCompressionType;
-use super::FileFormat;
-use super::FileFormatFactory;
-use crate::datasource::avro_to_arrow::read_avro_schema_from_reader;
-use crate::datasource::physical_plan::AvroSource;
-use crate::error::Result;
-use crate::physical_plan::ExecutionPlan;
-use crate::physical_plan::Statistics;
-
-use arrow::datatypes::Schema;
-use arrow::datatypes::SchemaRef;
-use async_trait::async_trait;
-use datafusion_catalog::Session;
-use datafusion_common::internal_err;
-use datafusion_common::parsers::CompressionTypeVariant;
-use datafusion_common::GetExt;
-use datafusion_common::DEFAULT_AVRO_EXTENSION;
-use datafusion_datasource::file::FileSource;
-use datafusion_datasource::file_scan_config::FileScanConfig;
-use datafusion_physical_expr::PhysicalExpr;
-use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
-
-#[derive(Default)]
-/// Factory struct used to create [AvroFormat]
-pub struct AvroFormatFactory;
-
-impl AvroFormatFactory {
- /// Creates an instance of [AvroFormatFactory]
- pub fn new() -> Self {
- Self {}
- }
-}
-
-impl FileFormatFactory for AvroFormatFactory {
- fn create(
- &self,
- _state: &dyn Session,
- _format_options: &HashMap<String, String>,
- ) -> Result<Arc<dyn FileFormat>> {
- Ok(Arc::new(AvroFormat))
- }
+//! Re-exports the [`datafusion_datasource_avro::file_format`] module, and
contains tests for it.
- fn default(&self) -> Arc<dyn FileFormat> {
- Arc::new(AvroFormat)
- }
-
- fn as_any(&self) -> &dyn Any {
- self
- }
-}
-
-impl fmt::Debug for AvroFormatFactory {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("AvroFormatFactory").finish()
- }
-}
-
-impl GetExt for AvroFormatFactory {
- fn get_ext(&self) -> String {
- // Removes the dot, i.e. ".parquet" -> "parquet"
- DEFAULT_AVRO_EXTENSION[1..].to_string()
- }
-}
-
-/// Avro `FileFormat` implementation.
-#[derive(Default, Debug)]
-pub struct AvroFormat;
-
-#[async_trait]
-impl FileFormat for AvroFormat {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn get_ext(&self) -> String {
- AvroFormatFactory::new().get_ext()
- }
-
- fn get_ext_with_compression(
- &self,
- file_compression_type: &FileCompressionType,
- ) -> Result<String> {
- let ext = self.get_ext();
- match file_compression_type.get_variant() {
- CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
- _ => internal_err!("Avro FileFormat does not support
compression."),
- }
- }
-
- async fn infer_schema(
- &self,
- _state: &dyn Session,
- store: &Arc<dyn ObjectStore>,
- objects: &[ObjectMeta],
- ) -> Result<SchemaRef> {
- let mut schemas = vec![];
- for object in objects {
- let r = store.as_ref().get(&object.location).await?;
- let schema = match r.payload {
- GetResultPayload::File(mut file, _) => {
- read_avro_schema_from_reader(&mut file)?
- }
- GetResultPayload::Stream(_) => {
- // TODO: Fetching entire file to get schema is potentially
wasteful
- let data = r.bytes().await?;
- read_avro_schema_from_reader(&mut data.as_ref())?
- }
- };
- schemas.push(schema);
- }
- let merged_schema = Schema::try_merge(schemas)?;
- Ok(Arc::new(merged_schema))
- }
-
- async fn infer_stats(
- &self,
- _state: &dyn Session,
- _store: &Arc<dyn ObjectStore>,
- table_schema: SchemaRef,
- _object: &ObjectMeta,
- ) -> Result<Statistics> {
- Ok(Statistics::new_unknown(&table_schema))
- }
-
- async fn create_physical_plan(
- &self,
- _state: &dyn Session,
- conf: FileScanConfig,
- _filters: Option<&Arc<dyn PhysicalExpr>>,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(conf.with_source(self.file_source()).build())
- }
-
- fn file_source(&self) -> Arc<dyn FileSource> {
- Arc::new(AvroSource::new())
- }
-}
+pub use datafusion_datasource_avro::file_format::*;
#[cfg(test)]
-#[cfg(feature = "avro")]
mod tests {
Review Comment:
Oh I didn't realize that was a thing. Glad to move everything there (in this
PR or a different one).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]