alamb commented on code in PR #7452:
URL: https://github.com/apache/arrow-datafusion/pull/7452#discussion_r1317665396


##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -330,43 +391,117 @@ pub(crate) async fn stateless_serialize_and_write_files(
         return internal_err!("single_file_output is true, but got more than 1 
writer!");
     }
     let num_partitions = data.len();
-    if !single_file_output && (num_partitions != writers.len()) {
+    let num_writers = writers.len();
+    if !single_file_output && (num_partitions != num_writers) {
         return internal_err!("single_file_ouput is false, but did not get 1 
writer for each output partition!");
     }
     let mut row_count = 0;
-    // Map errors to DatafusionError.
-    let err_converter =
-        |_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
-    // TODO parallelize serialization accross partitions and batches within 
partitions
-    // see: https://github.com/apache/arrow-datafusion/issues/7079
-    for (part_idx, data_stream) in 
data.iter_mut().enumerate().take(num_partitions) {
-        let idx = match single_file_output {
-            false => part_idx,
-            true => 0,
-        };
-        while let Some(maybe_batch) = data_stream.next().await {
-            // Write data to files in a round robin fashion:
-            let serializer = &mut serializers[idx];
-            let batch = check_for_errors(maybe_batch, &mut writers).await?;
-            row_count += batch.num_rows();
-            let bytes =
-                check_for_errors(serializer.serialize(batch).await, &mut 
writers).await?;
-            let writer = &mut writers[idx];
-            check_for_errors(
-                writer.write_all(&bytes).await.map_err(err_converter),
-                &mut writers,
-            )
-            .await?;
+    // tracks if any writers encountered an error triggering the need to abort
+    let mut any_errors = false;
+    // tracks the specific error triggering abort
+    let mut triggering_error = None;
+    // tracks if any errors were encountered in the process of aborting 
writers.
+    // if true, we may not have a guarentee that all written data was cleaned 
up.
+    let mut any_abort_errors = false;
+    match single_file_output {
+        false => {
+            let mut join_set = JoinSet::new();
+            for (data_stream, serializer, writer) in data
+                .into_iter()
+                .zip(serializers.into_iter())
+                .zip(writers.into_iter())
+                .map(|((a, b), c)| (a, b, c))
+            {
+                join_set.spawn(async move {
+                    serialize_rb_stream_to_object_store(data_stream, 
serializer, writer)
+                        .await
+                });
+            }
+            let mut finished_writers = Vec::with_capacity(num_writers);
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(res) => match res {
+                        Ok((_, writer, cnt)) => {
+                            finished_writers.push(writer);
+                            row_count += cnt;
+                        }
+                        Err((writer, e)) => {
+                            finished_writers.push(writer);
+                            any_errors = true;
+                            triggering_error = Some(e);
+                        }
+                    },
+                    Err(_) => {
+                        // Don't panic, instead try to clean up as many 
writers as possible.
+                        // If we hit this code, ownership of a writer was not 
joined back to
+                        // this thread, so we cannot clean it up (hence 
any_abort_errors is true)
+                        any_errors = true;
+                        any_abort_errors = true;
+                    }
+                }
+            }
+
+            // Finalize or abort writers as appropriate
+            for mut writer in finished_writers.into_iter() {
+                match any_errors {
+                    true => {
+                        let abort_result = writer.abort_writer();
+                        if abort_result.is_err() {
+                            any_abort_errors = true;
+                        }
+                    }
+                    false => {
+                        // TODO if we encounter an error during shutdown, 
delete previously written files?
+                        writer.shutdown()

Review Comment:
   Yes, I agree that if someone wants atomic commit/rollback they should build 
that in at a higher level than datafusion -- there isn't much we can do with 
just the object store API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to