This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 258a7cd1cb Support InsertInto Sorted ListingTable (#7743)
258a7cd1cb is described below

commit 258a7cd1cbf20d8f633db6e608649b98dde8b5ce
Author: Devin D'Angelo <[email protected]>
AuthorDate: Sun Oct 8 13:04:35 2023 -0400

    Support InsertInto Sorted ListingTable (#7743)
    
    * enable insert into sorted listing table
    
    * add check for single file table
    
    * improve test to verify both order conditions
    
    * address feedback
---
 datafusion/core/src/datasource/file_format/csv.rs  | 10 ++++-
 datafusion/core/src/datasource/file_format/json.rs |  9 +++-
 datafusion/core/src/datasource/file_format/mod.rs  |  3 +-
 .../core/src/datasource/file_format/parquet.rs     | 10 ++++-
 datafusion/core/src/datasource/listing/table.rs    | 48 +++++++++++++++-------
 datafusion/core/src/datasource/memory.rs           |  1 +
 datafusion/core/src/physical_planner.rs            |  2 +-
 datafusion/physical-plan/src/insert.rs             | 19 ++++++---
 .../sqllogictest/test_files/insert_to_external.slt | 42 +++++++++++++++++++
 9 files changed, 117 insertions(+), 27 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index c3295042b5..4c625b7ed7 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -29,7 +29,7 @@ use arrow::{self, datatypes::SchemaRef};
 use arrow_array::RecordBatch;
 use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
 use datafusion_execution::TaskContext;
-use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
 
 use async_trait::async_trait;
 use bytes::{Buf, Bytes};
@@ -263,6 +263,7 @@ impl FileFormat for CsvFormat {
         input: Arc<dyn ExecutionPlan>,
         _state: &SessionState,
         conf: FileSinkConfig,
+        order_requirements: Option<Vec<PhysicalSortRequirement>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         if conf.overwrite {
             return not_impl_err!("Overwrites are not implemented yet for CSV");
@@ -275,7 +276,12 @@ impl FileFormat for CsvFormat {
         let sink_schema = conf.output_schema().clone();
         let sink = Arc::new(CsvSink::new(conf));
 
-        Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
+        Ok(Arc::new(FileSinkExec::new(
+            input,
+            sink,
+            sink_schema,
+            order_requirements,
+        )) as _)
     }
 
     fn file_type(&self) -> FileType {
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 96fd4daa2d..6c260b9802 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -24,6 +24,7 @@ use datafusion_common::not_impl_err;
 use datafusion_common::DataFusionError;
 use datafusion_common::FileType;
 use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortRequirement;
 use rand::distributions::Alphanumeric;
 use rand::distributions::DistString;
 use std::fmt;
@@ -173,6 +174,7 @@ impl FileFormat for JsonFormat {
         input: Arc<dyn ExecutionPlan>,
         _state: &SessionState,
         conf: FileSinkConfig,
+        order_requirements: Option<Vec<PhysicalSortRequirement>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         if conf.overwrite {
             return not_impl_err!("Overwrites are not implemented yet for 
Json");
@@ -184,7 +186,12 @@ impl FileFormat for JsonFormat {
         let sink_schema = conf.output_schema().clone();
         let sink = Arc::new(JsonSink::new(conf, self.file_compression_type));
 
-        Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
+        Ok(Arc::new(FileSinkExec::new(
+            input,
+            sink,
+            sink_schema,
+            order_requirements,
+        )) as _)
     }
 
     fn file_type(&self) -> FileType {
diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index 86f265ab94..293f062d86 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -41,7 +41,7 @@ use crate::execution::context::SessionState;
 use crate::physical_plan::{ExecutionPlan, Statistics};
 
 use datafusion_common::{not_impl_err, DataFusionError, FileType};
-use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
 
 use async_trait::async_trait;
 use object_store::{ObjectMeta, ObjectStore};
@@ -99,6 +99,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         _input: Arc<dyn ExecutionPlan>,
         _state: &SessionState,
         _conf: FileSinkConfig,
+        _order_requirements: Option<Vec<PhysicalSortRequirement>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         not_impl_err!("Writer not implemented for this format")
     }
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 8ddddab71f..062ec1329d 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -36,7 +36,7 @@ use async_trait::async_trait;
 use bytes::{BufMut, BytesMut};
 use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, 
FileType};
 use datafusion_execution::TaskContext;
-use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
 use futures::{StreamExt, TryStreamExt};
 use hashbrown::HashMap;
 use object_store::{ObjectMeta, ObjectStore};
@@ -229,6 +229,7 @@ impl FileFormat for ParquetFormat {
         input: Arc<dyn ExecutionPlan>,
         _state: &SessionState,
         conf: FileSinkConfig,
+        order_requirements: Option<Vec<PhysicalSortRequirement>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         if conf.overwrite {
             return not_impl_err!("Overwrites are not implemented yet for 
Parquet");
@@ -237,7 +238,12 @@ impl FileFormat for ParquetFormat {
         let sink_schema = conf.output_schema().clone();
         let sink = Arc::new(ParquetSink::new(conf));
 
-        Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
+        Ok(Arc::new(FileSinkExec::new(
+            input,
+            sink,
+            sink_schema,
+            order_requirements,
+        )) as _)
     }
 
     fn file_type(&self) -> FileType {
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 7834de2b19..e2696d5233 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -57,7 +57,9 @@ use 
datafusion_execution::cache::cache_manager::FileStatisticsCache;
 use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
 use datafusion_expr::expr::Sort;
 use datafusion_optimizer::utils::conjunction;
-use datafusion_physical_expr::{create_physical_expr, LexOrdering, 
PhysicalSortExpr};
+use datafusion_physical_expr::{
+    create_physical_expr, LexOrdering, PhysicalSortExpr, 
PhysicalSortRequirement,
+};
 
 use async_trait::async_trait;
 use futures::{future, stream, StreamExt, TryStreamExt};
@@ -828,19 +830,6 @@ impl TableProvider for ListingTable {
             );
         }
 
-        // TODO support inserts to sorted tables which preserve sort_order
-        // Inserts currently make no effort to preserve sort_order. This could 
lead to
-        // incorrect query results on the table after inserting incorrectly 
sorted data.
-        let unsorted: Vec<Vec<Expr>> = vec![];
-        if self.options.file_sort_order != unsorted {
-            return Err(
-                DataFusionError::NotImplemented(
-                    "Writing to a sorted listing table via insert into is not 
supported yet. \
-                    To write to this table in the meantime, register an 
equivalent table with \
-                    file_sort_order = vec![]".into())
-            );
-        }
-
         let table_path = &self.table_paths()[0];
         // Get the object store for the table path.
         let store = state.runtime_env().object_store(table_path)?;
@@ -911,9 +900,38 @@ impl TableProvider for ListingTable {
             file_type_writer_options,
         };
 
+        let unsorted: Vec<Vec<Expr>> = vec![];
+        let order_requirements = if self.options().file_sort_order != unsorted 
{
+            if matches!(
+                self.options().insert_mode,
+                ListingTableInsertMode::AppendToFile
+            ) {
+                return Err(DataFusionError::Plan(
+                    "Cannot insert into a sorted ListingTable with mode 
append!".into(),
+                ));
+            }
+            // Multiple sort orders in outer vec are equivalent, so we pass 
only the first one
+            let ordering = self
+                .try_create_output_ordering()?
+                .get(0)
+                .ok_or(DataFusionError::Internal(
+                    "Expected ListingTable to have a sort order, but none 
found!".into(),
+                ))?
+                .clone();
+            // Converts Vec<Vec<SortExpr>> into type required by execution 
plan to specify its required input ordering
+            Some(
+                ordering
+                    .into_iter()
+                    .map(PhysicalSortRequirement::from)
+                    .collect::<Vec<_>>(),
+            )
+        } else {
+            None
+        };
+
         self.options()
             .format
-            .create_writer_physical_plan(input, state, config)
+            .create_writer_physical_plan(input, state, config, 
order_requirements)
             .await
     }
 }
diff --git a/datafusion/core/src/datasource/memory.rs 
b/datafusion/core/src/datasource/memory.rs
index 337a8cabc2..6231bd2c2f 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -223,6 +223,7 @@ impl TableProvider for MemTable {
             input,
             sink,
             self.schema.clone(),
+            None,
         )))
     }
 }
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 84b5b9afa7..35119f374f 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -604,7 +604,7 @@ impl DefaultPhysicalPlanner {
                         FileType::ARROW => Arc::new(ArrowFormat {}),
                     };
 
-                    sink_format.create_writer_physical_plan(input_exec, 
session_state, config).await
+                    sink_format.create_writer_physical_plan(input_exec, 
session_state, config, None).await
                 }
                 LogicalPlan::Dml(DmlStatement {
                     table_name,
diff --git a/datafusion/physical-plan/src/insert.rs 
b/datafusion/physical-plan/src/insert.rs
index 8b467461dd..a7b0d32c8e 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -73,6 +73,8 @@ pub struct FileSinkExec {
     sink_schema: SchemaRef,
     /// Schema describing the structure of the output data.
     count_schema: SchemaRef,
+    /// Optional required sort order for output data.
+    sort_order: Option<Vec<PhysicalSortRequirement>>,
 }
 
 impl fmt::Debug for FileSinkExec {
@@ -87,12 +89,14 @@ impl FileSinkExec {
         input: Arc<dyn ExecutionPlan>,
         sink: Arc<dyn DataSink>,
         sink_schema: SchemaRef,
+        sort_order: Option<Vec<PhysicalSortRequirement>>,
     ) -> Self {
         Self {
             input,
             sink,
             sink_schema,
             count_schema: make_count_schema(),
+            sort_order,
         }
     }
 
@@ -192,16 +196,20 @@ impl ExecutionPlan for FileSinkExec {
     }
 
     fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
-        // Require that the InsertExec gets the data in the order the
+        // The input order is either exlicitly set (such as by a ListingTable),
+        // or require that the [FileSinkExec] gets the data in the order the
         // input produced it (otherwise the optimizer may chose to reorder
         // the input which could result in unintended / poor UX)
         //
         // More rationale:
         // 
https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178
-        vec![self
-            .input
-            .output_ordering()
-            .map(PhysicalSortRequirement::from_sort_exprs)]
+        match &self.sort_order {
+            Some(requirements) => vec![Some(requirements.clone())],
+            None => vec![self
+                .input
+                .output_ordering()
+                .map(PhysicalSortRequirement::from_sort_exprs)],
+        }
     }
 
     fn maintains_input_order(&self) -> Vec<bool> {
@@ -221,6 +229,7 @@ impl ExecutionPlan for FileSinkExec {
             sink: self.sink.clone(),
             sink_schema: self.sink_schema.clone(),
             count_schema: self.count_schema.clone(),
+            sort_order: self.sort_order.clone(),
         }))
     }
 
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt 
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index a29c230a46..d1b73204e3 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -45,6 +45,48 @@ LOCATION '../../testing/data/csv/aggregate_test_100.csv'
 statement ok
 set datafusion.execution.target_partitions = 8;
 
+statement ok
+CREATE EXTERNAL TABLE
+ordered_insert_test(a bigint, b bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/insert_to_external/insert_to_ordered/'
+WITH ORDER (a ASC, B DESC)
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query TT
+EXPLAIN INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), 
(7,9), (7,10), (3, 3), (2, 4), (1, 5);
+----
+logical_plan
+Dml: op=[Insert Into] table=[ordered_insert_test]
+--Projection: column1 AS a, column2 AS b
+----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)), 
(Int64(7), Int64(8)), (Int64(7), Int64(9))...
+physical_plan
+InsertExec: sink=CsvSink(writer_mode=PutMultipart, file_groups=[])
+--SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC]
+----ProjectionExec: expr=[column1@0 as a, column2@1 as b]
+------ValuesExec
+
+query II
+INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), 
(7,10), (3, 3), (2, 4), (1, 5);
+----
+9
+
+query II
+SELECT * from ordered_insert_test;
+----
+1 5
+2 4
+3 3
+4 2
+5 1
+7 10
+7 9
+7 8
+7 7
+
 statement ok
 CREATE EXTERNAL TABLE
 single_file_test(a bigint, b bigint)

Reply via email to