alamb commented on code in PR #7452:
URL: https://github.com/apache/arrow-datafusion/pull/7452#discussion_r1320225484


##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -315,58 +300,237 @@ pub(crate) async fn create_writer(
     }
 }
 
+/// Serializes a single data stream in parallel and writes to an ObjectStore
+/// concurrently. Data order is preserved. In the event of an error,
+/// the ObjectStore writer is returned to the caller in addition to an error,
+/// so that the caller may handle aborting failed writes.
+async fn serialize_rb_stream_to_object_store(

Review Comment:
   I tried to rewrite this as a `futures::stream` computation using `buffered` 
-- but I got stuck on some "higher-ranked lifetime error" so I think this is 
about as good as it is going to get



##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -315,58 +300,237 @@ pub(crate) async fn create_writer(
     }
 }
 
+/// Serializes a single data stream in parallel and writes to an ObjectStore
+/// concurrently. Data order is preserved. In the event of an error,
+/// the ObjectStore writer is returned to the caller in addition to an error,
+/// so that the caller may handle aborting failed writes.
+async fn serialize_rb_stream_to_object_store(
+    mut data_stream: Pin<Box<dyn RecordBatchStream + Send>>,
+    mut serializer: Box<dyn BatchSerializer>,
+    mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
+    unbounded_input: bool,
+) -> std::result::Result<
+    (
+        Box<dyn BatchSerializer>,
+        AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
+        u64,
+    ),
+    (
+        AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
+        DataFusionError,
+    ),
+> {
+    let (tx, mut rx) =
+        mpsc::channel::<JoinHandle<Result<(usize, Bytes), 
DataFusionError>>>(100);
+
+    let serialize_task = tokio::spawn(async move {
+        while let Some(maybe_batch) = data_stream.next().await {
+            match serializer.duplicate() {
+                Ok(mut serializer_clone) => {
+                    let handle = tokio::spawn(async move {
+                        let batch = maybe_batch?;
+                        let num_rows = batch.num_rows();
+                        let bytes = serializer_clone.serialize(batch).await?;
+                        Ok((num_rows, bytes))
+                    });
+                    tx.send(handle).await.map_err(|_| {
+                        DataFusionError::Internal(
+                            "Unknown error writing to object store".into(),
+                        )
+                    })?;
+                    if unbounded_input {
+                        tokio::task::yield_now().await;
+                    }
+                }
+                Err(_) => {
+                    return Err(DataFusionError::Internal(
+                        "Unknown error writing to object store".into(),
+                    ))
+                }
+            }
+        }
+        Ok(serializer)
+    });
+
+    let mut row_count = 0;
+    while let Some(handle) = rx.recv().await {
+        match handle.await {
+            Ok(Ok((cnt, bytes))) => {
+                match writer.write_all(&bytes).await {
+                    Ok(_) => (),
+                    Err(_) => {
+                        return Err((
+                            writer,
+                            DataFusionError::Internal(
+                                "Unknown error writing to object store".into(),
+                            ),
+                        ))
+                    }

Review Comment:
   I think this is a real error and would be nice to pass along the message to 
the caller
   
   ```suggestion
                       Err(e) => {
                           return Err((
                               writer,
                               DataFusionError::Execution(
                                   format!("Error writing to object store: 
{e}"),
                               ),
                           ))
                       }
   ```
   



##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -315,58 +300,237 @@ pub(crate) async fn create_writer(
     }
 }
 
+/// Serializes a single data stream in parallel and writes to an ObjectStore
+/// concurrently. Data order is preserved. In the event of an error,
+/// the ObjectStore writer is returned to the caller in addition to an error,
+/// so that the caller may handle aborting failed writes.
+async fn serialize_rb_stream_to_object_store(
+    mut data_stream: Pin<Box<dyn RecordBatchStream + Send>>,
+    mut serializer: Box<dyn BatchSerializer>,
+    mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
+    unbounded_input: bool,
+) -> std::result::Result<
+    (
+        Box<dyn BatchSerializer>,
+        AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
+        u64,
+    ),
+    (
+        AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
+        DataFusionError,
+    ),
+> {
+    let (tx, mut rx) =
+        mpsc::channel::<JoinHandle<Result<(usize, Bytes), 
DataFusionError>>>(100);
+
+    let serialize_task = tokio::spawn(async move {
+        while let Some(maybe_batch) = data_stream.next().await {
+            match serializer.duplicate() {
+                Ok(mut serializer_clone) => {
+                    let handle = tokio::spawn(async move {
+                        let batch = maybe_batch?;
+                        let num_rows = batch.num_rows();
+                        let bytes = serializer_clone.serialize(batch).await?;
+                        Ok((num_rows, bytes))
+                    });
+                    tx.send(handle).await.map_err(|_| {
+                        DataFusionError::Internal(
+                            "Unknown error writing to object store".into(),
+                        )
+                    })?;
+                    if unbounded_input {
+                        tokio::task::yield_now().await;
+                    }
+                }
+                Err(_) => {
+                    return Err(DataFusionError::Internal(
+                        "Unknown error writing to object store".into(),
+                    ))
+                }
+            }
+        }
+        Ok(serializer)
+    });
+
+    let mut row_count = 0;
+    while let Some(handle) = rx.recv().await {
+        match handle.await {
+            Ok(Ok((cnt, bytes))) => {
+                match writer.write_all(&bytes).await {
+                    Ok(_) => (),
+                    Err(_) => {
+                        return Err((
+                            writer,
+                            DataFusionError::Internal(
+                                "Unknown error writing to object store".into(),
+                            ),
+                        ))
+                    }
+                };
+                row_count += cnt;
+            }
+            Ok(Err(e)) => {
+                // Return the writer along with the error
+                return Err((writer, e));
+            }
+            Err(_) => {
+                // Handle task panic or cancellation
+                return Err((
+                    writer,
+                    DataFusionError::Internal(
+                        "Serialization task panicked or was cancelled".into(),
+                    ),
+                ));
+            }

Review Comment:
   Likewise here I think this is a real error that could be exposed to the user
   
   ```suggestion
               Err(e) => {
                   // Handle task panic or cancellation
                   return Err((
                       writer,
                       DataFusionError::Execution(
                           format!("Serialization task panicked or was 
cancelled: {e}")
                       ),
                   ));
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to