tustvold commented on code in PR #6154:
URL: https://github.com/apache/arrow-datafusion/pull/6154#discussion_r1180501475


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -327,24 +326,20 @@ impl ExecutionPlan for MemoryWriteExec {
     ) -> Result<SendableRecordBatchStream> {
         let batch_count = self.batches.len();
         let data = self.input.execute(partition, context)?;
-        if batch_count >= self.input.output_partitioning().partition_count() {
-            // If the number of input partitions matches the number of 
MemTable partitions,
-            // use a lightweight implementation that doesn't utilize as many 
locks.
-            let table_partition = self.batches[partition].clone();
-            Ok(Box::pin(MemorySinkOneToOneStream::try_new(
-                table_partition,
-                data,
-                self.schema.clone(),
-            )?))
-        } else {
-            // Otherwise, use the locked implementation.
-            let table_partition = self.batches[partition % 
batch_count].clone();
-            Ok(Box::pin(MemorySinkStream::try_new(
-                table_partition,
-                data,
-                self.schema.clone(),
-            )?))
-        }
+        let schema = self.schema.clone();
+        let state = (data, self.batches[partition % batch_count].clone());
+
+        let stream = futures::stream::unfold(state, |mut state| async move {

Review Comment:
   We could easily hold the lock in the state if we wanted to, I just didn't 
think it warranted the added complexity. You could definitely do something like
   
   ```
   let stream = futures::stream::unfold(state, |mut state| async move {
       let locked = state.1.write_owned().await;
       loop {
           let batch = match state.0.next().await {
               Some(Ok(batch)) => batch,
               Some(Err(e)) => return Some((Err(e), state)),
               None => return None,
           };
           locked.push(batch)
       }
   });
   ```
   
   Or even
   
   
   ```
   let stream = futures::stream::unfold(state, |mut state| async move {
       let locked = state.1.write().await;
       loop {
           let batch = match state.0.next().await {
               Some(Ok(batch)) => batch,
               Some(Err(e)) => {
                   drop(locked);
                   return Some((Err(e), state))
               }
               None => {
                   drop(locked);
                   return None
               }
           };
           locked.push(batch)
       }
   });
   ```
   
   Ultimately an uncontended lock is not going to matter to performance, unless 
in a hot loop with no other branches



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to