devinjdangelo commented on code in PR #11060:
URL: https://github.com/apache/datafusion/pull/11060#discussion_r1650270938


##########
datafusion/common/src/config.rs:
##########
@@ -1116,6 +1116,16 @@ macro_rules! extensions_options {
     }
 }
 
+/// These file types have special built in behavior for configuration.
+/// Use TableOptions::Extensions for configuring other file types.
+#[derive(Debug, Clone)]
+pub enum ConfigFileType {

Review Comment:
   We could potentially remove the `TableOptions` code and instead have each 
`FileFormatFactory` handle configuration. This is actually mostly the case in 
this PR already, but `TableOptions` is a common helper. 



##########
datafusion/expr/src/logical_plan/dml.rs:
##########
@@ -35,8 +35,8 @@ pub struct CopyTo {
     pub output_url: String,
     /// Determines which, if any, columns should be used for hive-style 
partitioned writes
     pub partition_by: Vec<String>,
-    /// File format options.
-    pub format_options: FormatOptions,
+    /// File type trait
+    pub file_type: Arc<dyn FileType>,

Review Comment:
   This would introduce a dependency between datafusion-expr and 
datafusion-core. The `FileType` trait exists to avoid this dependency and so 
users only using datafusion for logical planning would not have to implement 
all of the physical execution related `FileFormat`methods. 



##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -58,6 +74,15 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
     /// downcast to a specific implementation.
     fn as_any(&self) -> &dyn Any;
 
+    /// Returns the extension for this FileFormat, e.g. "file.csv" -> csv
+    fn get_ext(&self) -> String;

Review Comment:
   I think &str would be too restrictive, but we could avoid requiring a String 
allocation with a cow instead. 



##########
datafusion/common/src/config.rs:
##########
@@ -1249,6 +1246,15 @@ impl TableOptions {
         clone
     }
 
+    /// Sets the file format for the table.
+    ///
+    /// # Parameters
+    ///
+    /// * `format`: The file format to use (e.g., CSV, Parquet).
+    pub fn set_file_format(&mut self, format: ConfigFileType) {

Review Comment:
   I changed the method name for now.
   
   It might make sense for `TableOptions` to be broken up and its functionality 
moved into each implementation of `FileFormatFactory`. I have avoided any 
reference to `FileFormat` in datafusion-common, since that would add an 
indirect dependency on `datafusion-core` to `datafusion-sql` and other crates.



##########
datafusion/proto/src/logical_plan/file_formats.rs:
##########
@@ -0,0 +1,399 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::{
+    datasource::file_format::{
+        arrow::ArrowFormatFactory, csv::CsvFormatFactory, 
json::JsonFormatFactory,
+        parquet::ParquetFormatFactory, FileFormatFactory,
+    },
+    prelude::SessionContext,
+};
+use datafusion_common::not_impl_err;
+
+use super::LogicalExtensionCodec;
+
+#[derive(Debug)]
+pub struct CsvLogicalExtensionCodec;
+
+// TODO! This is a placeholder for now and needs to be implemented for real.
+impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
+    fn try_decode(
+        &self,
+        _buf: &[u8],
+        _inputs: &[datafusion_expr::LogicalPlan],
+        _ctx: &datafusion::prelude::SessionContext,
+    ) -> datafusion_common::Result<datafusion_expr::Extension> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_encode(
+        &self,
+        _node: &datafusion_expr::Extension,
+        _buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_decode_table_provider(
+        &self,
+        _buf: &[u8],
+        _schema: arrow::datatypes::SchemaRef,
+        _ctx: &datafusion::prelude::SessionContext,
+    ) -> datafusion_common::Result<
+        std::sync::Arc<dyn datafusion::datasource::TableProvider>,
+    > {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_encode_table_provider(
+        &self,
+        _node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
+        _buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_decode_file_format(
+        &self,
+        __buf: &[u8],
+        __ctx: &SessionContext,
+    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
+        Ok(Arc::new(CsvFormatFactory::new()))
+    }
+
+    fn try_encode_file_format(
+        &self,
+        __buf: &[u8],
+        __node: Arc<dyn FileFormatFactory>,
+    ) -> datafusion_common::Result<()> {
+        Ok(())
+    }
+
+    fn try_decode_udf(
+        &self,
+        name: &str,
+        __buf: &[u8],
+    ) -> datafusion_common::Result<Arc<datafusion_expr::ScalarUDF>> {
+        not_impl_err!("LogicalExtensionCodec is not provided for scalar 
function {name}")
+    }
+
+    fn try_encode_udf(
+        &self,
+        __node: &datafusion_expr::ScalarUDF,
+        __buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        Ok(())
+    }
+}
+
+#[derive(Debug)]
+pub struct JsonLogicalExtensionCodec;
+
+// TODO! This is a placeholder for now and needs to be implemented for real.

Review Comment:
   Users depending on the ability to serialize COPY plans (e.g. ballista) will 
need this TODO to be completed before upgrading to any version of datafusion 
including this change.
   
   I think it would be OK to cut an follow up ticket for this.



##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -106,6 +131,67 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
     }
 }
 
+/// A container of [FileFormatFactory] which also implements [FileType].
+/// This enables converting a dyn FileFormat to a dyn FileType.
+/// The former trait is a superset of the latter trait, which includes 
execution time
+/// relevant methods. [FileType] is only used in logical planning and only 
implements
+/// the subset of methods required during logical planning.
+pub struct DefaultFileFormat {

Review Comment:
   I renamed this to `DefaultFileType`.



##########
datafusion/core/src/datasource/file_format/file_compression_type.rs:
##########
@@ -245,90 +250,16 @@ pub trait FileTypeExt {
     fn get_ext_with_compression(&self, c: FileCompressionType) -> 
Result<String>;
 }
 
-impl FileTypeExt for FileType {
-    fn get_ext_with_compression(&self, c: FileCompressionType) -> 
Result<String> {
-        let ext = self.get_ext();
-
-        match self {
-            FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, 
c.get_ext())),
-            FileType::AVRO | FileType::ARROW => match c.variant {
-                UNCOMPRESSED => Ok(ext),
-                _ => Err(DataFusionError::Internal(
-                    "FileCompressionType can be specified for CSV/JSON 
FileType.".into(),
-                )),
-            },
-            #[cfg(feature = "parquet")]
-            FileType::PARQUET => match c.variant {
-                UNCOMPRESSED => Ok(ext),
-                _ => Err(DataFusionError::Internal(
-                    "FileCompressionType can be specified for CSV/JSON 
FileType.".into(),
-                )),
-            },
-        }
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use std::str::FromStr;
 
-    use crate::datasource::file_format::file_compression_type::{
-        FileCompressionType, FileTypeExt,
-    };
+    use 
crate::datasource::file_format::file_compression_type::FileCompressionType;
     use crate::error::DataFusionError;
 
-    use datafusion_common::file_options::file_type::FileType;
-
     use bytes::Bytes;
     use futures::StreamExt;
 
-    #[test]
-    fn get_ext_with_compression() {

Review Comment:
   Yes, I moved this functionality to a trait method on each 
`FileFormatFactory`. We could add a test for each format which is similar to 
this test.



##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -106,6 +131,67 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
     }
 }
 
+/// A container of [FileFormatFactory] which also implements [FileType].
+/// This enables converting a dyn FileFormat to a dyn FileType.
+/// The former trait is a superset of the latter trait, which includes 
execution time
+/// relevant methods. [FileType] is only used in logical planning and only 
implements
+/// the subset of methods required during logical planning.
+pub struct DefaultFileFormat {
+    file_format_factory: Arc<dyn FileFormatFactory>,
+}
+
+impl DefaultFileFormat {
+    /// Constructs a [DefaultFileFormat] wrapper from a [FileFormatFactory]
+    pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
+        Self {
+            file_format_factory,
+        }
+    }
+}
+
+impl FileType for DefaultFileFormat {

Review Comment:
   I tried briefly, but ran into various errors. I am not sure if it is 
possible with the traits defined in different crates to make this work without 
a wrapper. I think that is why `DefaultTableSource` exists as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to