metesynnada commented on code in PR #6236:
URL: https://github.com/apache/arrow-datafusion/pull/6236#discussion_r1186001426
##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -327,43 +327,29 @@ impl ExecutionPlan for MemoryWriteExec {
let batch_count = self.batches.len();
let data = self.input.execute(partition, context)?;
let schema = self.schema.clone();
- let state = (data, self.batches[partition % batch_count].clone());
-
- let adapter = if batch_count >=
self.input.output_partitioning().partition_count()
- {
- // If the number of input partitions matches the number of
MemTable partitions,
- // use a lightweight implementation that doesn't utilize as many
locks.
- let stream = futures::stream::unfold(state, |mut state| async move
{
- // hold lock during the entire write
- let mut locked = state.1.write().await;
- loop {
- let batch = match state.0.next().await {
- Some(Ok(batch)) => batch,
- Some(Err(e)) => {
- drop(locked);
- return Some((Err(e), state));
- }
- None => return None,
- };
- locked.push(batch)
- }
- });
- Box::pin(RecordBatchStreamAdapter::new(schema, stream))
- as SendableRecordBatchStream
- } else {
- let stream = futures::stream::unfold(state, |mut state| async move
{
- loop {
- let batch = match state.0.next().await {
- Some(Ok(batch)) => batch,
- Some(Err(e)) => return Some((Err(e), state)),
- None => return None,
- };
- state.1.write().await.push(batch)
- }
- });
- Box::pin(RecordBatchStreamAdapter::new(schema, stream))
- as SendableRecordBatchStream
+ let state = StreamState {
+ data,
+ buffer: vec![],
+ batch: self.batches[partition % batch_count].clone(),
};
+
+ let stream = futures::stream::unfold(state, |mut state| async move {
+ loop {
+ match state.data.next().await {
+ Some(Ok(batch)) => state.buffer.push(batch),
+ Some(Err(e)) => return Some((Err(e), state)),
+ None => {
+ // stream is done, transfer all data to target
PartitionData
+ state.batch.write().await.append(&mut state.buffer);
Review Comment:
I think this is logical, every stream will append once no matter the
partition counts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]