alamb commented on code in PR #7452:
URL: https://github.com/apache/arrow-datafusion/pull/7452#discussion_r1320160930
##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -315,58 +300,237 @@ pub(crate) async fn create_writer(
}
}
+/// Serializes a single data stream in parallel and writes to an ObjectStore
+/// concurrently. Data order is preserved. In the event of an error,
+/// the ObjectStore writer is returned to the caller in addition to an error,
+/// so that the caller may handle aborting failed writes.
+async fn serialize_rb_stream_to_object_store(
Review Comment:
I have some ideas how to simplify this code, which I will try out shortly,
but I also think it can be merged like this too.
##########
datafusion/core/src/datasource/listing_table_factory.rs:
##########
@@ -213,7 +220,8 @@ impl TableProviderFactory for ListingTableFactory {
.with_file_sort_order(cmd.order_exprs.clone())
.with_insert_mode(insert_mode)
.with_single_file(single_file)
- .with_write_options(file_type_writer_options);
+ .with_write_options(file_type_writer_options)
+ .with_infinite_source(unbounded);
Review Comment:
I took the liberty of doing so in e6f1b2fe5a0219118919da5eaa24ac9293334ff3
##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -237,29 +240,11 @@ pub enum FileWriterMode {
pub trait BatchSerializer: Unpin + Send {
/// Asynchronously serializes a `RecordBatch` and returns the serialized
bytes.
async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
-}
-
-/// Checks if any of the passed writers have encountered an error
-/// and if so, all writers are aborted.
-async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
- result: Result<T>,
- writers: &mut [AbortableWrite<W>],
-) -> Result<T> {
- match result {
- Ok(value) => Ok(value),
- Err(e) => {
- // Abort all writers before returning the error:
- for writer in writers {
- let mut abort_future = writer.abort_writer();
- if let Ok(abort_future) = &mut abort_future {
- let _ = abort_future.await;
- }
- // Ignore errors that occur during abortion,
- // We do try to abort all writers before returning error.
- }
- // After aborting writers return original error.
- Err(e)
- }
+ /// Duplicates self to support serializing multiple batches in parralell
on multiple cores
Review Comment:
```suggestion
/// Duplicates self to support serializing multiple batches in parallel
on multiple cores
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]