This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d232065c3b refactor: rewrite mega type to an enum containing both 
cases (#11539)
d232065c3b is described below

commit d232065c3b710d0c8e035de49730238a30073eb2
Author: Lorrens Pantelis <[email protected]>
AuthorDate: Sun Jul 21 14:31:41 2024 +0200

    refactor: rewrite mega type to an enum containing both cases (#11539)
---
 .../datasource/file_format/write/orchestration.rs  | 62 +++++++++++++++++-----
 1 file changed, 48 insertions(+), 14 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs 
b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index f788865b07..1d32063ee9 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -42,6 +42,37 @@ use tokio::task::JoinSet;
 type WriterType = Box<dyn AsyncWrite + Send + Unpin>;
 type SerializerType = Arc<dyn BatchSerializer>;
 
+/// Result of calling [`serialize_rb_stream_to_object_store`]
+pub(crate) enum SerializedRecordBatchResult {
+    Success {
+        /// the writer
+        writer: WriterType,
+
+        /// the number of rows successfully written
+        row_count: usize,
+    },
+    Failure {
+        /// As explained in [`serialize_rb_stream_to_object_store`]:
+        /// - If an IO error occured that involved the ObjectStore writer, 
then the writer will not be returned to the caller
+        /// - Otherwise, the writer is returned to the caller
+        writer: Option<WriterType>,
+
+        /// the actual error that occured
+        err: DataFusionError,
+    },
+}
+
+impl SerializedRecordBatchResult {
+    /// Create the success variant
+    pub fn success(writer: WriterType, row_count: usize) -> Self {
+        Self::Success { writer, row_count }
+    }
+
+    pub fn failure(writer: Option<WriterType>, err: DataFusionError) -> Self {
+        Self::Failure { writer, err }
+    }
+}
+
 /// Serializes a single data stream in parallel and writes to an ObjectStore 
concurrently.
 /// Data order is preserved.
 ///
@@ -55,7 +86,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
     mut data_rx: Receiver<RecordBatch>,
     serializer: Arc<dyn BatchSerializer>,
     mut writer: WriterType,
-) -> std::result::Result<(WriterType, u64), (Option<WriterType>, 
DataFusionError)> {
+) -> SerializedRecordBatchResult {
     let (tx, mut rx) =
         mpsc::channel::<SpawnedTask<Result<(usize, Bytes), 
DataFusionError>>>(100);
     let serialize_task = SpawnedTask::spawn(async move {
@@ -86,43 +117,43 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
                 match writer.write_all(&bytes).await {
                     Ok(_) => (),
                     Err(e) => {
-                        return Err((
+                        return SerializedRecordBatchResult::failure(
                             None,
                             DataFusionError::Execution(format!(
                                 "Error writing to object store: {e}"
                             )),
-                        ))
+                        )
                     }
                 };
                 row_count += cnt;
             }
             Ok(Err(e)) => {
                 // Return the writer along with the error
-                return Err((Some(writer), e));
+                return SerializedRecordBatchResult::failure(Some(writer), e);
             }
             Err(e) => {
                 // Handle task panic or cancellation
-                return Err((
+                return SerializedRecordBatchResult::failure(
                     Some(writer),
                     DataFusionError::Execution(format!(
                         "Serialization task panicked or was cancelled: {e}"
                     )),
-                ));
+                );
             }
         }
     }
 
     match serialize_task.join().await {
         Ok(Ok(_)) => (),
-        Ok(Err(e)) => return Err((Some(writer), e)),
+        Ok(Err(e)) => return 
SerializedRecordBatchResult::failure(Some(writer), e),
         Err(_) => {
-            return Err((
+            return SerializedRecordBatchResult::failure(
                 Some(writer),
                 internal_datafusion_err!("Unknown error writing to object 
store"),
-            ))
+            )
         }
     }
-    Ok((writer, row_count as u64))
+    SerializedRecordBatchResult::success(writer, row_count)
 }
 
 type FileWriteBundle = (Receiver<RecordBatch>, SerializerType, WriterType);
@@ -153,14 +184,17 @@ pub(crate) async fn stateless_serialize_and_write_files(
     while let Some(result) = join_set.join_next().await {
         match result {
             Ok(res) => match res {
-                Ok((writer, cnt)) => {
+                SerializedRecordBatchResult::Success {
+                    writer,
+                    row_count: cnt,
+                } => {
                     finished_writers.push(writer);
                     row_count += cnt;
                 }
-                Err((writer, e)) => {
+                SerializedRecordBatchResult::Failure { writer, err } => {
                     finished_writers.extend(writer);
                     any_errors = true;
-                    triggering_error = Some(e);
+                    triggering_error = Some(err);
                 }
             },
             Err(e) => {
@@ -193,7 +227,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
         }
     }
 
-    tx.send(row_count).map_err(|_| {
+    tx.send(row_count as u64).map_err(|_| {
         internal_datafusion_err!(
             "Error encountered while sending row count back to file sink!"
         )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to