alamb commented on code in PR #6347:
URL: https://github.com/apache/arrow-datafusion/pull/6347#discussion_r1192799768
##########
datafusion/core/src/datasource/memory.rs:
##########
@@ -164,50 +166,58 @@ impl TableProvider for MemTable {
)?))
}
- /// Inserts the execution results of a given [`ExecutionPlan`] into this
[`MemTable`].
- /// The [`ExecutionPlan`] must have the same schema as this [`MemTable`].
- ///
- /// # Arguments
- ///
- /// * `state` - The [`SessionState`] containing the context for executing
the plan.
- /// * `input` - The [`ExecutionPlan`] to execute and insert.
- ///
- /// # Returns
- ///
- /// * A `Result` indicating success or failure.
- async fn insert_into(
- &self,
- _state: &SessionState,
- input: Arc<dyn ExecutionPlan>,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- // Create a physical plan from the logical plan.
- // Check that the schema of the plan matches the schema of this table.
- if !input.schema().eq(&self.schema) {
- return Err(DataFusionError::Plan(
- "Inserting query must have the same schema with the
table.".to_string(),
+ async fn write_to(&self) -> Result<Arc<dyn DataSink>> {
+ if self.batches.is_empty() {
+ return Err(DataFusionError::Internal(
+ "Can not insert into table without partitions.".to_string(),
));
}
+ Ok(Arc::new(MemSink::new(self.batches.clone())))
+ }
+}
- if self.batches.is_empty() {
- return Err(DataFusionError::Plan(
- "The table must have partitions.".to_string(),
- ));
+/// Implements for writing to a [`MemTable`]
+struct MemSink {
+ /// Target locations for writing data
+ batches: Vec<PartitionData>,
+}
+
+impl Debug for MemSink {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("MemSink")
+ .field("num_partitions", &self.batches.len())
+ .finish()
+ }
+}
+
+impl MemSink {
+ fn new(batches: Vec<PartitionData>) -> Self {
+ Self { batches }
+ }
+}
+
+#[async_trait]
+impl DataSink for MemSink {
+ async fn write_all(&self, mut data: SendableRecordBatchStream) ->
Result<u64> {
+ let num_partitions = self.batches.len();
+
+ // buffer up the data round robin stle into num_partitions new buffers
+ let mut new_batches = vec![vec![]; num_partitions];
Review Comment:
I am quite pleased that by following @tustvold 's guidance and pushing the
partitioning choice into the `DataSink` implementations, that the logic becomes
quite a bit simpler and flexible with seemingly no performance penalty
##########
datafusion/core/src/physical_plan/insert.rs:
##########
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for writing data to [`DataSink`]s
+
+use super::expressions::PhysicalSortExpr;
+use super::{
+ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
+};
+use crate::datasource::sink::DataSink;
+use crate::error::Result;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use arrow_array::{ArrayRef, UInt64Array};
+use arrow_schema::{DataType, Field, Schema};
+use core::fmt;
+use futures::StreamExt;
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::execution::context::TaskContext;
+use crate::physical_plan::stream::RecordBatchStreamAdapter;
+use crate::physical_plan::Distribution;
+use datafusion_common::DataFusionError;
+
+/// Execution plan for writing record batches to a [`DataSink`]
Review Comment:
This is basically a generic version of `MemoryWriteExec` that calls a `dyn
DataSink` to do the actual writing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]