milenkovicm commented on code in PR #20638:
URL: https://github.com/apache/datafusion/pull/20638#discussion_r2892071579


##########
datafusion/proto/src/logical_plan/mod.rs:
##########
@@ -207,6 +207,102 @@ impl LogicalExtensionCodec for 
DefaultLogicalExtensionCodec {
     ) -> Result<()> {
         not_impl_err!("LogicalExtensionCodec is not provided")
     }
+
+    fn try_decode_file_format(
+        &self,
+        buf: &[u8],
+        ctx: &TaskContext,
+    ) -> Result<Arc<dyn FileFormatFactory>> {
+        use prost::Message;
+
+        let proto = protobuf::FileFormatProto::decode(buf).map_err(|e| {
+            internal_datafusion_err!("Failed to decode FileFormatProto: {e}")
+        })?;
+
+        let kind = protobuf::FileFormatKind::try_from(proto.kind).map_err(|_| {
+            internal_datafusion_err!("Unknown FileFormatKind: {}", proto.kind)
+        })?;
+
+        match kind {
+            protobuf::FileFormatKind::Csv => 
file_formats::CsvLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            protobuf::FileFormatKind::Json => 
file_formats::JsonLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            #[cfg(feature = "parquet")]
+            protobuf::FileFormatKind::Parquet => {
+                file_formats::ParquetLogicalExtensionCodec
+                    .try_decode_file_format(&proto.options, ctx)
+            }
+            protobuf::FileFormatKind::Arrow => 
file_formats::ArrowLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            protobuf::FileFormatKind::Avro => 
file_formats::AvroLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            #[cfg(not(feature = "parquet"))]
+            protobuf::FileFormatKind::Parquet => {
+                not_impl_err!("Parquet support requires the 'parquet' feature")
+            }
+            protobuf::FileFormatKind::Unspecified => {
+                not_impl_err!("Unspecified file format kind")
+            }
+        }
+    }
+
+    fn try_encode_file_format(
+        &self,
+        buf: &mut Vec<u8>,
+        node: Arc<dyn FileFormatFactory>,
+    ) -> Result<()> {
+        use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
+        use datafusion_datasource_csv::file_format::CsvFormatFactory;
+        use datafusion_datasource_json::file_format::JsonFormatFactory;
+        use prost::Message;

Review Comment:
   should we put imports on top of the file?



##########
datafusion/proto/src/logical_plan/mod.rs:
##########
@@ -207,6 +207,102 @@ impl LogicalExtensionCodec for 
DefaultLogicalExtensionCodec {
     ) -> Result<()> {
         not_impl_err!("LogicalExtensionCodec is not provided")
     }
+
+    fn try_decode_file_format(
+        &self,
+        buf: &[u8],
+        ctx: &TaskContext,
+    ) -> Result<Arc<dyn FileFormatFactory>> {
+        use prost::Message;

Review Comment:
   should we put imports on top of the file?



##########
datafusion/proto/src/logical_plan/mod.rs:
##########
@@ -207,6 +207,102 @@ impl LogicalExtensionCodec for 
DefaultLogicalExtensionCodec {
     ) -> Result<()> {
         not_impl_err!("LogicalExtensionCodec is not provided")
     }
+
+    fn try_decode_file_format(
+        &self,
+        buf: &[u8],
+        ctx: &TaskContext,
+    ) -> Result<Arc<dyn FileFormatFactory>> {
+        use prost::Message;

Review Comment:
   would it make sense to have to add it on top of the file 



##########
datafusion/proto/src/logical_plan/mod.rs:
##########
@@ -207,6 +207,102 @@ impl LogicalExtensionCodec for 
DefaultLogicalExtensionCodec {
     ) -> Result<()> {
         not_impl_err!("LogicalExtensionCodec is not provided")
     }
+
+    fn try_decode_file_format(
+        &self,
+        buf: &[u8],
+        ctx: &TaskContext,
+    ) -> Result<Arc<dyn FileFormatFactory>> {
+        use prost::Message;
+
+        let proto = protobuf::FileFormatProto::decode(buf).map_err(|e| {
+            internal_datafusion_err!("Failed to decode FileFormatProto: {e}")
+        })?;
+
+        let kind = protobuf::FileFormatKind::try_from(proto.kind).map_err(|_| {
+            internal_datafusion_err!("Unknown FileFormatKind: {}", proto.kind)
+        })?;
+
+        match kind {
+            protobuf::FileFormatKind::Csv => 
file_formats::CsvLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            protobuf::FileFormatKind::Json => 
file_formats::JsonLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            #[cfg(feature = "parquet")]
+            protobuf::FileFormatKind::Parquet => {
+                file_formats::ParquetLogicalExtensionCodec
+                    .try_decode_file_format(&proto.options, ctx)
+            }
+            protobuf::FileFormatKind::Arrow => 
file_formats::ArrowLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            protobuf::FileFormatKind::Avro => 
file_formats::AvroLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            #[cfg(not(feature = "parquet"))]
+            protobuf::FileFormatKind::Parquet => {
+                not_impl_err!("Parquet support requires the 'parquet' feature")
+            }
+            protobuf::FileFormatKind::Unspecified => {
+                not_impl_err!("Unspecified file format kind")
+            }
+        }
+    }
+
+    fn try_encode_file_format(
+        &self,
+        buf: &mut Vec<u8>,
+        node: Arc<dyn FileFormatFactory>,
+    ) -> Result<()> {
+        use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
+        use datafusion_datasource_csv::file_format::CsvFormatFactory;
+        use datafusion_datasource_json::file_format::JsonFormatFactory;
+        use prost::Message;
+
+        let any = node.as_any();
+        let mut options = Vec::new();

Review Comment:
   its encoded file format, not an option 



##########
datafusion/proto/proto/datafusion.proto:
##########
@@ -270,6 +270,20 @@ message CopyToNode {
   repeated string partition_by = 7;
 }
 
+enum FileFormatKind {
+  FILE_FORMAT_KIND_UNSPECIFIED = 0;
+  FILE_FORMAT_KIND_CSV = 1;
+  FILE_FORMAT_KIND_JSON = 2;
+  FILE_FORMAT_KIND_PARQUET = 3;
+  FILE_FORMAT_KIND_ARROW = 4;
+  FILE_FORMAT_KIND_AVRO = 5;
+}
+

Review Comment:
   could we add some comments desribing what this message does



##########
datafusion/proto/src/logical_plan/mod.rs:
##########
@@ -207,6 +207,102 @@ impl LogicalExtensionCodec for 
DefaultLogicalExtensionCodec {
     ) -> Result<()> {
         not_impl_err!("LogicalExtensionCodec is not provided")
     }
+
+    fn try_decode_file_format(
+        &self,
+        buf: &[u8],
+        ctx: &TaskContext,
+    ) -> Result<Arc<dyn FileFormatFactory>> {
+        use prost::Message;
+
+        let proto = protobuf::FileFormatProto::decode(buf).map_err(|e| {
+            internal_datafusion_err!("Failed to decode FileFormatProto: {e}")
+        })?;
+
+        let kind = protobuf::FileFormatKind::try_from(proto.kind).map_err(|_| {
+            internal_datafusion_err!("Unknown FileFormatKind: {}", proto.kind)
+        })?;
+
+        match kind {
+            protobuf::FileFormatKind::Csv => 
file_formats::CsvLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            protobuf::FileFormatKind::Json => 
file_formats::JsonLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            #[cfg(feature = "parquet")]
+            protobuf::FileFormatKind::Parquet => {
+                file_formats::ParquetLogicalExtensionCodec
+                    .try_decode_file_format(&proto.options, ctx)
+            }
+            protobuf::FileFormatKind::Arrow => 
file_formats::ArrowLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            protobuf::FileFormatKind::Avro => 
file_formats::AvroLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            #[cfg(not(feature = "parquet"))]
+            protobuf::FileFormatKind::Parquet => {
+                not_impl_err!("Parquet support requires the 'parquet' feature")
+            }
+            protobuf::FileFormatKind::Unspecified => {
+                not_impl_err!("Unspecified file format kind")
+            }
+        }
+    }
+
+    fn try_encode_file_format(
+        &self,
+        buf: &mut Vec<u8>,
+        node: Arc<dyn FileFormatFactory>,
+    ) -> Result<()> {
+        use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
+        use datafusion_datasource_csv::file_format::CsvFormatFactory;
+        use datafusion_datasource_json::file_format::JsonFormatFactory;
+        use prost::Message;

Review Comment:
   same here, is there a reason why imports can't go at file beginning 



##########
datafusion/proto/src/logical_plan/mod.rs:
##########
@@ -207,6 +207,102 @@ impl LogicalExtensionCodec for 
DefaultLogicalExtensionCodec {
     ) -> Result<()> {
         not_impl_err!("LogicalExtensionCodec is not provided")
     }
+
+    fn try_decode_file_format(
+        &self,
+        buf: &[u8],
+        ctx: &TaskContext,
+    ) -> Result<Arc<dyn FileFormatFactory>> {
+        use prost::Message;
+
+        let proto = protobuf::FileFormatProto::decode(buf).map_err(|e| {
+            internal_datafusion_err!("Failed to decode FileFormatProto: {e}")
+        })?;
+
+        let kind = protobuf::FileFormatKind::try_from(proto.kind).map_err(|_| {
+            internal_datafusion_err!("Unknown FileFormatKind: {}", proto.kind)
+        })?;
+
+        match kind {
+            protobuf::FileFormatKind::Csv => 
file_formats::CsvLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            protobuf::FileFormatKind::Json => 
file_formats::JsonLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            #[cfg(feature = "parquet")]
+            protobuf::FileFormatKind::Parquet => {
+                file_formats::ParquetLogicalExtensionCodec
+                    .try_decode_file_format(&proto.options, ctx)
+            }
+            protobuf::FileFormatKind::Arrow => 
file_formats::ArrowLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            protobuf::FileFormatKind::Avro => 
file_formats::AvroLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            #[cfg(not(feature = "parquet"))]
+            protobuf::FileFormatKind::Parquet => {
+                not_impl_err!("Parquet support requires the 'parquet' feature")
+            }
+            protobuf::FileFormatKind::Unspecified => {
+                not_impl_err!("Unspecified file format kind")
+            }
+        }
+    }
+
+    fn try_encode_file_format(
+        &self,
+        buf: &mut Vec<u8>,
+        node: Arc<dyn FileFormatFactory>,
+    ) -> Result<()> {
+        use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
+        use datafusion_datasource_csv::file_format::CsvFormatFactory;
+        use datafusion_datasource_json::file_format::JsonFormatFactory;
+        use prost::Message;
+
+        let any = node.as_any();

Review Comment:
   do we need to extract variable here?



##########
datafusion/proto/src/logical_plan/mod.rs:
##########
@@ -207,6 +207,102 @@ impl LogicalExtensionCodec for 
DefaultLogicalExtensionCodec {
     ) -> Result<()> {
         not_impl_err!("LogicalExtensionCodec is not provided")
     }
+
+    fn try_decode_file_format(
+        &self,
+        buf: &[u8],
+        ctx: &TaskContext,
+    ) -> Result<Arc<dyn FileFormatFactory>> {
+        use prost::Message;
+
+        let proto = protobuf::FileFormatProto::decode(buf).map_err(|e| {
+            internal_datafusion_err!("Failed to decode FileFormatProto: {e}")
+        })?;
+
+        let kind = protobuf::FileFormatKind::try_from(proto.kind).map_err(|_| {
+            internal_datafusion_err!("Unknown FileFormatKind: {}", proto.kind)
+        })?;
+
+        match kind {
+            protobuf::FileFormatKind::Csv => 
file_formats::CsvLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            protobuf::FileFormatKind::Json => 
file_formats::JsonLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            #[cfg(feature = "parquet")]
+            protobuf::FileFormatKind::Parquet => {
+                file_formats::ParquetLogicalExtensionCodec
+                    .try_decode_file_format(&proto.options, ctx)
+            }
+            protobuf::FileFormatKind::Arrow => 
file_formats::ArrowLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            protobuf::FileFormatKind::Avro => 
file_formats::AvroLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            #[cfg(not(feature = "parquet"))]
+            protobuf::FileFormatKind::Parquet => {
+                not_impl_err!("Parquet support requires the 'parquet' feature")
+            }
+            protobuf::FileFormatKind::Unspecified => {
+                not_impl_err!("Unspecified file format kind")
+            }
+        }
+    }
+
+    fn try_encode_file_format(
+        &self,
+        buf: &mut Vec<u8>,
+        node: Arc<dyn FileFormatFactory>,
+    ) -> Result<()> {
+        use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
+        use datafusion_datasource_csv::file_format::CsvFormatFactory;
+        use datafusion_datasource_json::file_format::JsonFormatFactory;
+        use prost::Message;
+
+        let any = node.as_any();

Review Comment:
   do we need to extract it as a variable here



##########
datafusion/proto/src/logical_plan/mod.rs:
##########
@@ -207,6 +207,102 @@ impl LogicalExtensionCodec for 
DefaultLogicalExtensionCodec {
     ) -> Result<()> {
         not_impl_err!("LogicalExtensionCodec is not provided")
     }
+
+    fn try_decode_file_format(
+        &self,
+        buf: &[u8],
+        ctx: &TaskContext,
+    ) -> Result<Arc<dyn FileFormatFactory>> {
+        use prost::Message;
+
+        let proto = protobuf::FileFormatProto::decode(buf).map_err(|e| {
+            internal_datafusion_err!("Failed to decode FileFormatProto: {e}")
+        })?;
+
+        let kind = protobuf::FileFormatKind::try_from(proto.kind).map_err(|_| {
+            internal_datafusion_err!("Unknown FileFormatKind: {}", proto.kind)
+        })?;
+
+        match kind {
+            protobuf::FileFormatKind::Csv => 
file_formats::CsvLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            protobuf::FileFormatKind::Json => 
file_formats::JsonLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            #[cfg(feature = "parquet")]
+            protobuf::FileFormatKind::Parquet => {
+                file_formats::ParquetLogicalExtensionCodec
+                    .try_decode_file_format(&proto.options, ctx)
+            }
+            protobuf::FileFormatKind::Arrow => 
file_formats::ArrowLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            protobuf::FileFormatKind::Avro => 
file_formats::AvroLogicalExtensionCodec
+                .try_decode_file_format(&proto.options, ctx),
+            #[cfg(not(feature = "parquet"))]
+            protobuf::FileFormatKind::Parquet => {
+                not_impl_err!("Parquet support requires the 'parquet' feature")
+            }
+            protobuf::FileFormatKind::Unspecified => {
+                not_impl_err!("Unspecified file format kind")
+            }
+        }
+    }
+
+    fn try_encode_file_format(
+        &self,
+        buf: &mut Vec<u8>,
+        node: Arc<dyn FileFormatFactory>,
+    ) -> Result<()> {
+        use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
+        use datafusion_datasource_csv::file_format::CsvFormatFactory;
+        use datafusion_datasource_json::file_format::JsonFormatFactory;
+        use prost::Message;
+
+        let any = node.as_any();
+        let mut options = Vec::new();

Review Comment:
   maybe some other name than `options`



##########
datafusion/proto/proto/datafusion.proto:
##########
@@ -270,6 +270,20 @@ message CopyToNode {
   repeated string partition_by = 7;
 }
 
+enum FileFormatKind {
+  FILE_FORMAT_KIND_UNSPECIFIED = 0;
+  FILE_FORMAT_KIND_CSV = 1;
+  FILE_FORMAT_KIND_JSON = 2;
+  FILE_FORMAT_KIND_PARQUET = 3;
+  FILE_FORMAT_KIND_ARROW = 4;
+  FILE_FORMAT_KIND_AVRO = 5;
+}
+
+message FileFormatProto {
+  FileFormatKind kind = 1;
+  bytes options = 2;

Review Comment:
   its not `option` its encoded file format, maybe more informative name 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to