mustafasrepo commented on code in PR #6526:
URL: https://github.com/apache/arrow-datafusion/pull/6526#discussion_r1218099874
##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -324,6 +349,196 @@ fn build_schema_helper(names: Vec<String>, types:
&[HashSet<DataType>]) -> Schem
Schema::new(fields)
}
+impl Default for CsvSerializer {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Define a struct for serializing CSV records to a stream
+pub struct CsvSerializer {
+ // CSV writer builder
+ builder: WriterBuilder,
+ // Inner buffer for avoiding reallocation
+ buffer: Vec<u8>,
+ // Flag to indicate whether there will be a header
+ header: bool,
+}
+
+impl CsvSerializer {
+ /// Constructor for the CsvSerializer object
+ pub fn new() -> Self {
+ Self {
+ builder: WriterBuilder::new(),
+ header: true,
+ buffer: Vec::with_capacity(4096),
+ }
+ }
+
+ /// Method for setting the CSV writer builder
+ pub fn with_builder(mut self, builder: WriterBuilder) -> Self {
+ self.builder = builder;
+ self
+ }
+
+ /// Method for setting the CSV writer header status
+ pub fn with_header(mut self, header: bool) -> Self {
+ self.header = header;
+ self
+ }
+}
+
+#[async_trait]
+impl BatchSerializer for CsvSerializer {
+ async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
+ let builder = self.builder.clone();
+ let mut writer = builder.has_headers(self.header).build(&mut
self.buffer);
+ writer.write(&batch)?;
+ drop(writer);
+ self.header = false;
+ Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
+ }
+}
+
+async fn check_for_errors<T>(
+ result: Result<T>,
+ writers: &mut [Box<dyn FileWriterExt>],
+) -> Result<T> {
+ match result {
+ Ok(value) => Ok(value),
+ Err(e) => {
+ // Abort all writers before returning the error:
+ for writer in writers {
+ let mut abort_future = writer.abort_writer();
+ if let Ok(abort_future) = &mut abort_future {
+ let _ = abort_future.await;
+ }
+ // Ignore errors that occur during abortion,
+ // We do try to abort all writers before returning error.
+ }
+ // After aborting writers return original error.
+ Err(e)
+ }
+ }
+}
+
+/// Implements [`DataSink`] for writing to a CSV file.
+struct CsvSink {
+ /// Config options for writing data
+ config: FileSinkConfig,
+ has_header: bool,
+ delimiter: u8,
+ file_compression_type: FileCompressionType,
+}
+
+impl Debug for CsvSink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("CsvSink")
+ .field("has_header", &self.has_header)
+ .field("delimiter", &self.delimiter)
+ .field("file_compression_type", &self.file_compression_type)
+ .finish()
+ }
+}
+
+impl Display for CsvSink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "CsvSink(writer_mode={:?}, file_groups={})",
+ self.config.writer_mode,
+ FileGroupDisplay(&self.config.file_groups),
+ )
+ }
+}
+
+impl CsvSink {
+ fn new(
+ config: FileSinkConfig,
+ has_header: bool,
+ delimiter: u8,
+ file_compression_type: FileCompressionType,
+ ) -> Self {
+ Self {
+ config,
+ has_header,
+ delimiter,
+ file_compression_type,
+ }
+ }
+}
+
+#[async_trait]
+impl DataSink for CsvSink {
+ async fn write_all(
+ &self,
+ mut data: SendableRecordBatchStream,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ let num_partitions = self.config.file_groups.len();
+
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.config.object_store_url)?;
+ let opener = CsvWriterOpener::new(
+ self.config.writer_mode,
+ object_store,
+ self.file_compression_type.to_owned(),
+ );
+
+ // Construct serializer and writer for each file group
+ let mut serializers = vec![];
+ let mut writers = vec![];
+ for file_group in &self.config.file_groups {
+ let header = self.has_header
+ && (!matches!(&self.config.writer_mode, FileWriterMode::Append)
+ || file_group.object_meta.size == 0);
Review Comment:
Exactly, I have simplified the condition
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]