tustvold commented on code in PR #6354:
URL: https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1194278752


##########
datafusion/core/src/datasource/memory.rs:
##########
@@ -187,27 +189,54 @@ impl TableProvider for MemTable {
                 "Inserting query must have the same schema with the 
table.".to_string(),
             ));
         }
+        let sink = Arc::new(MemSink::new(self.batches.clone()));
+        Ok(Arc::new(InsertExec::new(input, sink)))
+    }
+}
 
-        if self.batches.is_empty() {
-            return Err(DataFusionError::Plan(
-                "The table must have partitions.".to_string(),
-            ));
+/// Implements for writing to a [`MemTable`]
+struct MemSink {
+    /// Target locations for writing data
+    batches: Vec<PartitionData>,
+}
+
+impl Debug for MemSink {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("MemSink")
+            .field("num_partitions", &self.batches.len())
+            .finish()
+    }
+}
+
+impl MemSink {
+    fn new(batches: Vec<PartitionData>) -> Self {
+        Self { batches }
+    }
+}
+
+#[async_trait]
+impl DataSink for MemSink {
+    async fn write_all(&self, mut data: SendableRecordBatchStream) -> 
Result<u64> {
+        let num_partitions = self.batches.len();
+
+        // buffer up the data round robin style into num_partitions
+
+        let mut new_batches = vec![vec![]; num_partitions];
+        let mut i = 0;
+        let mut row_count = 0;
+        while let Some(batch) = data.next().await.transpose()? {
+            row_count += batch.num_rows();
+            new_batches[i].push(batch);
+            i = (i + 1) % num_partitions;
         }
 
-        let input = if self.batches.len() > 1 {
-            Arc::new(RepartitionExec::try_new(
-                input,
-                Partitioning::RoundRobinBatch(self.batches.len()),
-            )?)
-        } else {
-            input
-        };
+        // write the outputs into the batches
+        for (target, mut batches) in 
self.batches.iter().zip(new_batches.into_iter()) {
+            // Append all the new batches in one go to minimize locking 
overhead
+            target.write().await.append(&mut batches);

Review Comment:
   I wonder if we can now remove the per-partition locks :thinking: 
   
   Possibly even switching to using a non-async lock even...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to