alamb commented on code in PR #6154:
URL: https://github.com/apache/arrow-datafusion/pull/6154#discussion_r1185368790


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -327,24 +326,25 @@ impl ExecutionPlan for MemoryWriteExec {
     ) -> Result<SendableRecordBatchStream> {
         let batch_count = self.batches.len();
         let data = self.input.execute(partition, context)?;
-        if batch_count >= self.input.output_partitioning().partition_count() {
-            // If the number of input partitions matches the number of 
MemTable partitions,
-            // use a lightweight implementation that doesn't utilize as many 
locks.
-            let table_partition = self.batches[partition].clone();
-            Ok(Box::pin(MemorySinkOneToOneStream::try_new(
-                table_partition,
-                data,
-                self.schema.clone(),
-            )?))
-        } else {
-            // Otherwise, use the locked implementation.
-            let table_partition = self.batches[partition % 
batch_count].clone();
-            Ok(Box::pin(MemorySinkStream::try_new(
-                table_partition,
-                data,
-                self.schema.clone(),
-            )?))
-        }
+        let schema = self.schema.clone();
+        let state = (data, self.batches[partition % batch_count].clone());
+
+        let stream = futures::stream::unfold(state, |mut state| async move {
+            // hold lock during the entire write
+            let mut locked = state.1.write().await;
+            loop {
+                let batch = match state.0.next().await {
+                    Some(Ok(batch)) => batch,
+                    Some(Err(e)) => {
+                        drop(locked);
+                        return Some((Err(e), state));
+                    }
+                    None => return None,
+                };
+                locked.push(batch)
+            }
+        });
+        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))

Review Comment:
   After thinking about this more, I don't understand why we are locking at all 
in the loop -- if we are really worried about the locking performance we can 
just buffer the batches and copy them at the end of the stream. This should 
also keep the code simpler.  I'll make a follow on PR with a proposal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to