metesynnada commented on code in PR #7452:
URL: https://github.com/apache/arrow-datafusion/pull/7452#discussion_r1316823526


##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -315,13 +299,90 @@ pub(crate) async fn create_writer(
     }
 }
 
+/// Serializes a single data stream in parallel and writes to an ObjectStore
+/// concurrently. Data order is preserved. In the event of an error,
+/// the ObjectStore writer is returned to the caller in addition to an error,
+/// so that the caller may handle aborting failed writes.
+async fn serialize_rb_stream_to_object_store(
+    mut data_stream: Pin<Box<dyn RecordBatchStream + Send>>,
+    mut serializer: Box<dyn BatchSerializer>,
+    mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
+) -> std::result::Result<
+    (
+        Box<dyn BatchSerializer>,
+        AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
+        u64,
+    ),
+    (
+        AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
+        DataFusionError,
+    ),
+> {
+    let mut row_count = 0;
+    // Not using JoinSet here since we want to ulimately write to ObjectStore 
preserving file order
+    let mut serialize_tasks: Vec<JoinHandle<Result<(usize, Bytes), 
DataFusionError>>> =
+        Vec::new();
+    while let Some(maybe_batch) = data_stream.next().await {
+        let mut serializer_clone = match serializer.duplicate() {
+            Ok(s) => s,
+            Err(_) => {
+                return Err((
+                    writer,
+                    DataFusionError::Internal(
+                        "Unknown error writing to object store".into(),
+                    ),
+                ))
+            }
+        };
+        serialize_tasks.push(task::spawn(async move {
+            let batch = maybe_batch?;
+            let num_rows = batch.num_rows();
+            let bytes = serializer_clone.serialize(batch).await?;
+            Ok((num_rows, bytes))
+        }));
+    }
+    for serialize_result in serialize_tasks {
+        let result = serialize_result.await;
+        match result {
+            Ok(res) => {
+                let (cnt, bytes) = match res {
+                    Ok(r) => r,
+                    Err(e) => return Err((writer, e)),
+                };
+                row_count += cnt;
+                match writer.write_all(&bytes).await {
+                    Ok(_) => (),
+                    Err(_) => {
+                        return Err((
+                            writer,
+                            DataFusionError::Internal(
+                                "Unknown error writing to object store".into(),
+                            ),
+                        ))
+                    }
+                };
+            }
+            Err(_) => {
+                return Err((
+                    writer,
+                    DataFusionError::Internal(
+                        "Unknown error writing to object store".into(),
+                    ),
+                ))
+            }
+        }
+    }
+
+    Ok((serializer, writer, row_count as u64))
+}

Review Comment:
   Thank you for your feedback on the code. I've taken a closer look at the use 
of `yield_now().await` and its implications on task execution. Let's clarify a 
few points:
   
   **1- Purpose of `yield_now().await`:** The `yield_now().await` call tells 
the Tokio runtime, "I'm temporarily done with my current work, and you can run 
other tasks if they're ready." It doesn't inherently force tasks to run in 
sequence; instead, it provides an opportunity for the runtime to switch to 
another task. If no other task is ready, the current task will continue almost 
immediately.
   
   **2- Parallelism vs. Concurrency:** Tokio's work-stealing scheduler allows 
tasks to run concurrently by default. True parallel execution requires multiple 
threads. If your profiling runtime is configured to use a single thread, tasks 
will run sequentially, even if they're spawned concurrently. The 
`yield_now().await` call can influence the order of task execution but doesn't 
force sequential execution by itself.
   
   Based on what you've said, I would suggest checking if the profiling runtime 
is single-threaded. Can you also provide your configuration regarding this 
matter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to