This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 82fd6a7de3 Move execute_input_stream (#11449)
82fd6a7de3 is described below
commit 82fd6a7de310fef4e365c333b0f7fc2a3e4ed12e
Author: Berkay Şahin <[email protected]>
AuthorDate: Tue Jul 16 23:34:01 2024 +0300
Move execute_input_stream (#11449)
---
datafusion/physical-plan/src/insert.rs | 77 ++++-----------------------
datafusion/physical-plan/src/lib.rs | 97 ++++++++++++++++++++++++++++++++--
2 files changed, 103 insertions(+), 71 deletions(-)
diff --git a/datafusion/physical-plan/src/insert.rs
b/datafusion/physical-plan/src/insert.rs
index 1c21991d93..5cd864125e 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -23,8 +23,8 @@ use std::fmt::Debug;
use std::sync::Arc;
use super::{
- DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
Partitioning,
- PlanProperties, SendableRecordBatchStream,
+ execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan,
+ ExecutionPlanProperties, Partitioning, PlanProperties,
SendableRecordBatchStream,
};
use crate::metrics::MetricsSet;
use crate::stream::RecordBatchStreamAdapter;
@@ -33,7 +33,7 @@ use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow_array::{ArrayRef, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
-use datafusion_common::{exec_err, internal_err, Result};
+use datafusion_common::{internal_err, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
Distribution, EquivalenceProperties, PhysicalSortRequirement,
@@ -120,46 +120,6 @@ impl DataSinkExec {
}
}
- fn execute_input_stream(
- &self,
- partition: usize,
- context: Arc<TaskContext>,
- ) -> Result<SendableRecordBatchStream> {
- let input_stream = self.input.execute(partition, context)?;
-
- debug_assert_eq!(
- self.sink_schema.fields().len(),
- self.input.schema().fields().len()
- );
-
- // Find input columns that may violate the not null constraint.
- let risky_columns: Vec<_> = self
- .sink_schema
- .fields()
- .iter()
- .zip(self.input.schema().fields().iter())
- .enumerate()
- .filter_map(|(i, (sink_field, input_field))| {
- if !sink_field.is_nullable() && input_field.is_nullable() {
- Some(i)
- } else {
- None
- }
- })
- .collect();
-
- if risky_columns.is_empty() {
- Ok(input_stream)
- } else {
- // Check not null constraint on the input stream
- Ok(Box::pin(RecordBatchStreamAdapter::new(
- Arc::clone(&self.sink_schema),
- input_stream
- .map(move |batch| check_not_null_contraits(batch?,
&risky_columns)),
- )))
- }
- }
-
/// Input execution plan
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
@@ -269,7 +229,12 @@ impl ExecutionPlan for DataSinkExec {
if partition != 0 {
return internal_err!("DataSinkExec can only be called on partition
0!");
}
- let data = self.execute_input_stream(0, Arc::clone(&context))?;
+ let data = execute_input_stream(
+ Arc::clone(&self.input),
+ Arc::clone(&self.sink_schema),
+ 0,
+ Arc::clone(&context),
+ )?;
let count_schema = Arc::clone(&self.count_schema);
let sink = Arc::clone(&self.sink);
@@ -314,27 +279,3 @@ fn make_count_schema() -> SchemaRef {
false,
)]))
}
-
-fn check_not_null_contraits(
- batch: RecordBatch,
- column_indices: &Vec<usize>,
-) -> Result<RecordBatch> {
- for &index in column_indices {
- if batch.num_columns() <= index {
- return exec_err!(
- "Invalid batch column count {} expected > {}",
- batch.num_columns(),
- index
- );
- }
-
- if batch.column(index).null_count() > 0 {
- return exec_err!(
- "Invalid batch column at '{}' has null but schema specifies
non-nullable",
- index
- );
- }
- }
-
- Ok(batch)
-}
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index f3a709ff76..dc736993a4 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -36,13 +36,13 @@ use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use datafusion_common::config::ConfigOptions;
-use datafusion_common::{exec_datafusion_err, Result};
+use datafusion_common::{exec_datafusion_err, exec_err, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
EquivalenceProperties, LexOrdering, PhysicalSortExpr,
PhysicalSortRequirement,
};
-use futures::stream::TryStreamExt;
+use futures::stream::{StreamExt, TryStreamExt};
use log::debug;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinSet;
@@ -97,7 +97,7 @@ pub use datafusion_physical_expr::{
// Backwards compatibility
use crate::common::IPCWriter;
pub use crate::stream::EmptyRecordBatchStream;
-use crate::stream::RecordBatchReceiverStream;
+use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
@@ -805,6 +805,97 @@ pub fn execute_stream_partitioned(
Ok(streams)
}
+/// Executes an input stream and ensures that the resulting stream adheres to
+/// the `not null` constraints specified in the `sink_schema`.
+///
+/// # Arguments
+///
+/// * `input` - An execution plan
+/// * `sink_schema` - The schema to be applied to the output stream
+/// * `partition` - The partition index to be executed
+/// * `context` - The task context
+///
+/// # Returns
+///
+/// * `Result<SendableRecordBatchStream>` - A stream of `RecordBatch`es if
successful
+///
+/// This function first executes the given input plan for the specified
partition
+/// and context. It then checks if there are any columns in the input that
might
+/// violate the `not null` constraints specified in the `sink_schema`. If
there are
+/// such columns, it wraps the resulting stream to enforce the `not null`
constraints
+/// by invoking the `check_not_null_contraits` function on each batch of the
stream.
+pub fn execute_input_stream(
+ input: Arc<dyn ExecutionPlan>,
+ sink_schema: SchemaRef,
+ partition: usize,
+ context: Arc<TaskContext>,
+) -> Result<SendableRecordBatchStream> {
+ let input_stream = input.execute(partition, context)?;
+
+ debug_assert_eq!(sink_schema.fields().len(),
input.schema().fields().len());
+
+ // Find input columns that may violate the not null constraint.
+ let risky_columns: Vec<_> = sink_schema
+ .fields()
+ .iter()
+ .zip(input.schema().fields().iter())
+ .enumerate()
+ .filter_map(|(idx, (sink_field, input_field))| {
+ (!sink_field.is_nullable() &&
input_field.is_nullable()).then_some(idx)
+ })
+ .collect();
+
+ if risky_columns.is_empty() {
+ Ok(input_stream)
+ } else {
+ // Check not null constraint on the input stream
+ Ok(Box::pin(RecordBatchStreamAdapter::new(
+ sink_schema,
+ input_stream
+ .map(move |batch| check_not_null_contraits(batch?,
&risky_columns)),
+ )))
+ }
+}
+
+/// Checks a `RecordBatch` for `not null` constraints on specified columns.
+///
+/// # Arguments
+///
+/// * `batch` - The `RecordBatch` to be checked
+/// * `column_indices` - A vector of column indices that should be checked for
+/// `not null` constraints.
+///
+/// # Returns
+///
+/// * `Result<RecordBatch>` - The original `RecordBatch` if all constraints
are met
+///
+/// This function iterates over the specified column indices and ensures that
none
+/// of the columns contain null values. If any column contains null values, an
error
+/// is returned.
+pub fn check_not_null_contraits(
+ batch: RecordBatch,
+ column_indices: &Vec<usize>,
+) -> Result<RecordBatch> {
+ for &index in column_indices {
+ if batch.num_columns() <= index {
+ return exec_err!(
+ "Invalid batch column count {} expected > {}",
+ batch.num_columns(),
+ index
+ );
+ }
+
+ if batch.column(index).null_count() > 0 {
+ return exec_err!(
+ "Invalid batch column at '{}' has null but schema specifies
non-nullable",
+ index
+ );
+ }
+ }
+
+ Ok(batch)
+}
+
/// Utility function yielding a string representation of the given
[`ExecutionPlan`].
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]