mustafasrepo commented on code in PR #6526:
URL: https://github.com/apache/arrow-datafusion/pull/6526#discussion_r1217871090
##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -324,6 +349,188 @@ fn build_schema_helper(names: Vec<String>, types:
&[HashSet<DataType>]) -> Schem
Schema::new(fields)
}
+impl Default for CsvSerializer {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Define a struct for serializing CSV records to a stream
+pub struct CsvSerializer {
+ // CSV writer builder
+ builder: WriterBuilder,
+ // Inner buffer for avoiding reallocation
+ buffer: Vec<u8>,
+ // Flag to indicate whether there will be a header
+ header: bool,
+}
+
+impl CsvSerializer {
+ /// Constructor for the CsvSerializer object
+ pub fn new() -> Self {
+ Self {
+ builder: WriterBuilder::new(),
+ header: true,
+ buffer: Vec::with_capacity(4096),
+ }
+ }
+
+ /// Method for setting the CSV writer builder
+ pub fn with_builder(mut self, builder: WriterBuilder) -> Self {
+ self.builder = builder;
+ self
+ }
+
+ /// Method for setting the CSV writer header status
+ pub fn with_header(mut self, header: bool) -> Self {
+ self.header = header;
+ self
+ }
+}
+
+#[async_trait]
+impl BatchSerializer for CsvSerializer {
+ async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
+ let builder = self.builder.clone();
+ let mut writer = builder.has_headers(self.header).build(&mut
self.buffer);
+ writer.write(&batch)?;
+ drop(writer);
+ self.header = false;
+ Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
+ }
+}
+
+/// This macro tries to abort all writers before returning a possible error.
+macro_rules! handle_err_or_continue {
+ ($result:expr, $writers:expr) => {
+ match $result {
+ Ok(value) => Ok(value),
+ Err(e) => {
+ // Abort all writers before returning the error:
+ for writer in $writers {
+ let abort_future = writer.abort_writer()?;
Review Comment:
I converted macro, to function. Also during abortion we receive an error,
that case doesn't return early. We try to abort all writers
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]