metesynnada commented on code in PR #6154:
URL: https://github.com/apache/arrow-datafusion/pull/6154#discussion_r1185192902


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -327,24 +326,25 @@ impl ExecutionPlan for MemoryWriteExec {
     ) -> Result<SendableRecordBatchStream> {
         let batch_count = self.batches.len();
         let data = self.input.execute(partition, context)?;
-        if batch_count >= self.input.output_partitioning().partition_count() {
-            // If the number of input partitions matches the number of 
MemTable partitions,
-            // use a lightweight implementation that doesn't utilize as many 
locks.
-            let table_partition = self.batches[partition].clone();
-            Ok(Box::pin(MemorySinkOneToOneStream::try_new(
-                table_partition,
-                data,
-                self.schema.clone(),
-            )?))
-        } else {
-            // Otherwise, use the locked implementation.
-            let table_partition = self.batches[partition % 
batch_count].clone();
-            Ok(Box::pin(MemorySinkStream::try_new(
-                table_partition,
-                data,
-                self.schema.clone(),
-            )?))
-        }
+        let schema = self.schema.clone();
+        let state = (data, self.batches[partition % batch_count].clone());
+
+        let stream = futures::stream::unfold(state, |mut state| async move {
+            // hold lock during the entire write
+            let mut locked = state.1.write().await;
+            loop {
+                let batch = match state.0.next().await {
+                    Some(Ok(batch)) => batch,
+                    Some(Err(e)) => {
+                        drop(locked);
+                        return Some((Err(e), state));
+                    }
+                    None => return None,
+                };
+                locked.push(batch)
+            }
+        });
+        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))

Review Comment:
   ```suggestion
          let adapter = if batch_count >= 
self.input.output_partitioning().partition_count()
           {
               // If the number of input partitions matches the number of 
MemTable partitions,
               // use a lightweight implementation that doesn't utilize as many 
locks.
               let stream = futures::stream::unfold(state, |mut state| async 
move {
                   // hold lock during the entire write
                   let mut locked = state.1.write().await;
                   loop {
                       let batch = match state.0.next().await {
                           Some(Ok(batch)) => batch,
                           Some(Err(e)) => {
                               drop(locked);
                               return Some((Err(e), state));
                           }
                           None => return None,
                       };
                       locked.push(batch)
                   }
               });
               Box::pin(RecordBatchStreamAdapter::new(schema, stream))
                   as SendableRecordBatchStream
           } else {
               let stream = futures::stream::unfold(state, |mut state| async 
move {
                   loop {
                       let batch = match state.0.next().await {
                           Some(Ok(batch)) => batch,
                           Some(Err(e)) => return Some((Err(e), state)),
                           None => return None,
                       };
                       state.1.write().await.push(batch)
                   }
               });
               Box::pin(RecordBatchStreamAdapter::new(schema, stream))
                   as SendableRecordBatchStream
           };
           Ok(adapter)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to