alamb commented on code in PR #7141:
URL: https://github.com/apache/arrow-datafusion/pull/7141#discussion_r1284824501
##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -572,44 +578,86 @@ impl DataSink for CsvSink {
// Construct serializer and writer for each file group
let mut serializers = vec![];
let mut writers = vec![];
- for file_group in &self.config.file_groups {
- // In append mode, consider has_header flag only when file is
empty (at the start).
- // For other modes, use has_header flag as is.
- let header = self.has_header
- && (!matches!(&self.config.writer_mode, FileWriterMode::Append)
- || file_group.object_meta.size == 0);
- let builder = WriterBuilder::new().with_delimiter(self.delimiter);
- let serializer = CsvSerializer::new()
- .with_builder(builder)
- .with_header(header);
- serializers.push(serializer);
-
- let file = file_group.clone();
- let writer = self
- .create_writer(file.object_meta.clone().into(),
object_store.clone())
- .await?;
- writers.push(writer);
+ match self.config.writer_mode {
+ FileWriterMode::Append => {
+ for file_group in &self.config.file_groups {
+ // In append mode, consider has_header flag only when file
is empty (at the start).
+ // For other modes, use has_header flag as is.
+ let header = self.has_header
+ && (!matches!(&self.config.writer_mode,
FileWriterMode::Append)
+ || file_group.object_meta.size == 0);
+ let builder =
WriterBuilder::new().with_delimiter(self.delimiter);
+ let serializer = CsvSerializer::new()
+ .with_builder(builder)
+ .with_header(header);
+ serializers.push(serializer);
+
+ let file = file_group.clone();
+ let writer = self
+ .create_writer(
+ file.object_meta.clone().into(),
+ object_store.clone(),
+ )
+ .await?;
+ writers.push(writer);
+ }
+ }
+ FileWriterMode::Put => {
+ return Err(DataFusionError::NotImplemented(
+ "Put Mode is not implemented for CSV Sink yet".into(),
+ ))
+ }
+ FileWriterMode::PutMultipart => {
+ //currently assuming only 1 partition path (i.e. not hive
style partitioning on a column)
+ let base_path = &self.config.table_paths[0];
+ //uniquely identify this batch of files with a random string,
to prevent collisions overwriting files
+ let write_id = Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
+ for part_idx in 0..num_partitions {
+ let header = true;
+ let builder =
WriterBuilder::new().with_delimiter(self.delimiter);
+ let serializer = CsvSerializer::new()
+ .with_builder(builder)
+ .with_header(header);
+ serializers.push(serializer);
+ let file_path = base_path
+ .prefix()
+ .child(format!("/{}_{}.csv", write_id, part_idx));
+ let object_meta = ObjectMeta {
+ location: file_path,
+ last_modified: chrono::offset::Utc::now(),
+ size: 0,
+ e_tag: None,
+ };
+ let writer = self
+ .create_writer(object_meta.into(),
object_store.clone())
+ .await?;
+ writers.push(writer);
+ }
+ }
}
- let mut idx = 0;
let mut row_count = 0;
// Map errors to DatafusionError.
let err_converter =
|_| DataFusionError::Internal("Unexpected FileSink
Error".to_string());
- while let Some(maybe_batch) = data.next().await {
- // Write data to files in a round robin fashion:
- idx = (idx + 1) % num_partitions;
- let serializer = &mut serializers[idx];
- let batch = check_for_errors(maybe_batch, &mut writers).await?;
- row_count += batch.num_rows();
- let bytes =
- check_for_errors(serializer.serialize(batch).await, &mut
writers).await?;
- let writer = &mut writers[idx];
- check_for_errors(
- writer.write_all(&bytes).await.map_err(err_converter),
- &mut writers,
- )
- .await?;
+ // TODO parallelize serialization accross partitions and batches
within partitions
Review Comment:
this todo can probably be done with some fancy async stream stuff --
however, I got a little hung on on how to handle the abort case. I'll try and
think on it some more
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]