This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d5367f3ff5 Avoid calling shutdown after failed write of AsyncWrite 
(#249) (#250) (#11415)
d5367f3ff5 is described below

commit d5367f3ff5ed506e824a04c68120194deb68a908
Author: Georgi Krastev <[email protected]>
AuthorDate: Fri Jul 12 22:34:35 2024 +0300

    Avoid calling shutdown after failed write of AsyncWrite (#249) (#250) 
(#11415)
    
    in `serialize_rb_stream_to_object_store`
---
 .../datasource/file_format/write/orchestration.rs  | 27 +++++++++++++---------
 1 file changed, 16 insertions(+), 11 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs 
b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index a62b5715ae..8bd0dae9f5 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -42,15 +42,20 @@ use tokio::task::JoinSet;
 type WriterType = Box<dyn AsyncWrite + Send + Unpin>;
 type SerializerType = Arc<dyn BatchSerializer>;
 
-/// Serializes a single data stream in parallel and writes to an ObjectStore
-/// concurrently. Data order is preserved. In the event of an error,
-/// the ObjectStore writer is returned to the caller in addition to an error,
-/// so that the caller may handle aborting failed writes.
+/// Serializes a single data stream in parallel and writes to an ObjectStore 
concurrently.
+/// Data order is preserved.
+///
+/// In the event of a non-IO error which does not involve the ObjectStore 
writer,
+/// the writer returned to the caller in addition to the error,
+/// so that failed writes may be aborted.
+///
+/// In the event of an IO error involving the ObjectStore writer,
+/// the writer is dropped to avoid calling further methods on it which might 
panic.
 pub(crate) async fn serialize_rb_stream_to_object_store(
     mut data_rx: Receiver<RecordBatch>,
     serializer: Arc<dyn BatchSerializer>,
     mut writer: WriterType,
-) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
+) -> std::result::Result<(WriterType, u64), (Option<WriterType>, 
DataFusionError)> {
     let (tx, mut rx) =
         mpsc::channel::<SpawnedTask<Result<(usize, Bytes), 
DataFusionError>>>(100);
     let serialize_task = SpawnedTask::spawn(async move {
@@ -82,7 +87,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
                     Ok(_) => (),
                     Err(e) => {
                         return Err((
-                            writer,
+                            None,
                             DataFusionError::Execution(format!(
                                 "Error writing to object store: {e}"
                             )),
@@ -93,12 +98,12 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
             }
             Ok(Err(e)) => {
                 // Return the writer along with the error
-                return Err((writer, e));
+                return Err((Some(writer), e));
             }
             Err(e) => {
                 // Handle task panic or cancellation
                 return Err((
-                    writer,
+                    Some(writer),
                     DataFusionError::Execution(format!(
                         "Serialization task panicked or was cancelled: {e}"
                     )),
@@ -109,10 +114,10 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
 
     match serialize_task.join().await {
         Ok(Ok(_)) => (),
-        Ok(Err(e)) => return Err((writer, e)),
+        Ok(Err(e)) => return Err((Some(writer), e)),
         Err(_) => {
             return Err((
-                writer,
+                Some(writer),
                 internal_datafusion_err!("Unknown error writing to object 
store"),
             ))
         }
@@ -153,7 +158,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
                     row_count += cnt;
                 }
                 Err((writer, e)) => {
-                    finished_writers.push(writer);
+                    finished_writers.extend(writer);
                     any_errors = true;
                     triggering_error = Some(e);
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to