This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 82fd6a7de3 Move execute_input_stream (#11449)
82fd6a7de3 is described below

commit 82fd6a7de310fef4e365c333b0f7fc2a3e4ed12e
Author: Berkay Şahin <[email protected]>
AuthorDate: Tue Jul 16 23:34:01 2024 +0300

    Move execute_input_stream (#11449)
---
 datafusion/physical-plan/src/insert.rs | 77 ++++-----------------------
 datafusion/physical-plan/src/lib.rs    | 97 ++++++++++++++++++++++++++++++++--
 2 files changed, 103 insertions(+), 71 deletions(-)

diff --git a/datafusion/physical-plan/src/insert.rs 
b/datafusion/physical-plan/src/insert.rs
index 1c21991d93..5cd864125e 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -23,8 +23,8 @@ use std::fmt::Debug;
 use std::sync::Arc;
 
 use super::{
-    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, 
Partitioning,
-    PlanProperties, SendableRecordBatchStream,
+    execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan,
+    ExecutionPlanProperties, Partitioning, PlanProperties, 
SendableRecordBatchStream,
 };
 use crate::metrics::MetricsSet;
 use crate::stream::RecordBatchStreamAdapter;
@@ -33,7 +33,7 @@ use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use arrow_array::{ArrayRef, UInt64Array};
 use arrow_schema::{DataType, Field, Schema};
-use datafusion_common::{exec_err, internal_err, Result};
+use datafusion_common::{internal_err, Result};
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{
     Distribution, EquivalenceProperties, PhysicalSortRequirement,
@@ -120,46 +120,6 @@ impl DataSinkExec {
         }
     }
 
-    fn execute_input_stream(
-        &self,
-        partition: usize,
-        context: Arc<TaskContext>,
-    ) -> Result<SendableRecordBatchStream> {
-        let input_stream = self.input.execute(partition, context)?;
-
-        debug_assert_eq!(
-            self.sink_schema.fields().len(),
-            self.input.schema().fields().len()
-        );
-
-        // Find input columns that may violate the not null constraint.
-        let risky_columns: Vec<_> = self
-            .sink_schema
-            .fields()
-            .iter()
-            .zip(self.input.schema().fields().iter())
-            .enumerate()
-            .filter_map(|(i, (sink_field, input_field))| {
-                if !sink_field.is_nullable() && input_field.is_nullable() {
-                    Some(i)
-                } else {
-                    None
-                }
-            })
-            .collect();
-
-        if risky_columns.is_empty() {
-            Ok(input_stream)
-        } else {
-            // Check not null constraint on the input stream
-            Ok(Box::pin(RecordBatchStreamAdapter::new(
-                Arc::clone(&self.sink_schema),
-                input_stream
-                    .map(move |batch| check_not_null_contraits(batch?, 
&risky_columns)),
-            )))
-        }
-    }
-
     /// Input execution plan
     pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
         &self.input
@@ -269,7 +229,12 @@ impl ExecutionPlan for DataSinkExec {
         if partition != 0 {
             return internal_err!("DataSinkExec can only be called on partition 
0!");
         }
-        let data = self.execute_input_stream(0, Arc::clone(&context))?;
+        let data = execute_input_stream(
+            Arc::clone(&self.input),
+            Arc::clone(&self.sink_schema),
+            0,
+            Arc::clone(&context),
+        )?;
 
         let count_schema = Arc::clone(&self.count_schema);
         let sink = Arc::clone(&self.sink);
@@ -314,27 +279,3 @@ fn make_count_schema() -> SchemaRef {
         false,
     )]))
 }
-
-fn check_not_null_contraits(
-    batch: RecordBatch,
-    column_indices: &Vec<usize>,
-) -> Result<RecordBatch> {
-    for &index in column_indices {
-        if batch.num_columns() <= index {
-            return exec_err!(
-                "Invalid batch column count {} expected > {}",
-                batch.num_columns(),
-                index
-            );
-        }
-
-        if batch.column(index).null_count() > 0 {
-            return exec_err!(
-                "Invalid batch column at '{}' has null but schema specifies 
non-nullable",
-                index
-            );
-        }
-    }
-
-    Ok(batch)
-}
diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index f3a709ff76..dc736993a4 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -36,13 +36,13 @@ use arrow::datatypes::SchemaRef;
 use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
 use datafusion_common::config::ConfigOptions;
-use datafusion_common::{exec_datafusion_err, Result};
+use datafusion_common::{exec_datafusion_err, exec_err, Result};
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{
     EquivalenceProperties, LexOrdering, PhysicalSortExpr, 
PhysicalSortRequirement,
 };
 
-use futures::stream::TryStreamExt;
+use futures::stream::{StreamExt, TryStreamExt};
 use log::debug;
 use tokio::sync::mpsc::Sender;
 use tokio::task::JoinSet;
@@ -97,7 +97,7 @@ pub use datafusion_physical_expr::{
 // Backwards compatibility
 use crate::common::IPCWriter;
 pub use crate::stream::EmptyRecordBatchStream;
-use crate::stream::RecordBatchReceiverStream;
+use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
 use datafusion_execution::disk_manager::RefCountedTempFile;
 use datafusion_execution::memory_pool::human_readable_size;
 pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
@@ -805,6 +805,97 @@ pub fn execute_stream_partitioned(
     Ok(streams)
 }
 
+/// Executes an input stream and ensures that the resulting stream adheres to
+/// the `not null` constraints specified in the `sink_schema`.
+///
+/// # Arguments
+///
+/// * `input` - An execution plan
+/// * `sink_schema` - The schema to be applied to the output stream
+/// * `partition` - The partition index to be executed
+/// * `context` - The task context
+///
+/// # Returns
+///
+/// * `Result<SendableRecordBatchStream>` - A stream of `RecordBatch`es if 
successful
+///
+/// This function first executes the given input plan for the specified 
partition
+/// and context. It then checks if there are any columns in the input that 
might
+/// violate the `not null` constraints specified in the `sink_schema`. If 
there are
+/// such columns, it wraps the resulting stream to enforce the `not null` 
constraints
+/// by invoking the `check_not_null_contraits` function on each batch of the 
stream.
+pub fn execute_input_stream(
+    input: Arc<dyn ExecutionPlan>,
+    sink_schema: SchemaRef,
+    partition: usize,
+    context: Arc<TaskContext>,
+) -> Result<SendableRecordBatchStream> {
+    let input_stream = input.execute(partition, context)?;
+
+    debug_assert_eq!(sink_schema.fields().len(), 
input.schema().fields().len());
+
+    // Find input columns that may violate the not null constraint.
+    let risky_columns: Vec<_> = sink_schema
+        .fields()
+        .iter()
+        .zip(input.schema().fields().iter())
+        .enumerate()
+        .filter_map(|(idx, (sink_field, input_field))| {
+            (!sink_field.is_nullable() && 
input_field.is_nullable()).then_some(idx)
+        })
+        .collect();
+
+    if risky_columns.is_empty() {
+        Ok(input_stream)
+    } else {
+        // Check not null constraint on the input stream
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            sink_schema,
+            input_stream
+                .map(move |batch| check_not_null_contraits(batch?, 
&risky_columns)),
+        )))
+    }
+}
+
+/// Checks a `RecordBatch` for `not null` constraints on specified columns.
+///
+/// # Arguments
+///
+/// * `batch` - The `RecordBatch` to be checked
+/// * `column_indices` - A vector of column indices that should be checked for
+///   `not null` constraints.
+///
+/// # Returns
+///
+/// * `Result<RecordBatch>` - The original `RecordBatch` if all constraints 
are met
+///
+/// This function iterates over the specified column indices and ensures that 
none
+/// of the columns contain null values. If any column contains null values, an 
error
+/// is returned.
+pub fn check_not_null_contraits(
+    batch: RecordBatch,
+    column_indices: &Vec<usize>,
+) -> Result<RecordBatch> {
+    for &index in column_indices {
+        if batch.num_columns() <= index {
+            return exec_err!(
+                "Invalid batch column count {} expected > {}",
+                batch.num_columns(),
+                index
+            );
+        }
+
+        if batch.column(index).null_count() > 0 {
+            return exec_err!(
+                "Invalid batch column at '{}' has null but schema specifies 
non-nullable",
+                index
+            );
+        }
+    }
+
+    Ok(batch)
+}
+
 /// Utility function yielding a string representation of the given 
[`ExecutionPlan`].
 pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
     let formatted = displayable(plan.as_ref()).indent(true).to_string();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to