This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 250e71694b Support Partitioning Data by Dictionary Encoded String 
Array Types (#7896)
250e71694b is described below

commit 250e71694b4f9789444e52b8cc12476dcbf35ac6
Author: Devin D'Angelo <[email protected]>
AuthorDate: Sat Oct 28 06:04:47 2023 -0400

    Support Partitioning Data by Dictionary Encoded String Array Types (#7896)
    
    * support dictionary encoded string columns for partition cols
    
    * remove debug prints
    
    * cargo fmt
    
    * generic dictionary cast and dict encoded test
    
    * updates from review
    
    * force retry checks
    
    * try checks again
---
 datafusion/common/src/dfschema.rs                  | 91 ++++++++++++++++++++++
 .../core/src/datasource/file_format/write/demux.rs | 18 ++++-
 datafusion/core/src/datasource/listing/table.rs    |  5 +-
 datafusion/core/src/datasource/memory.rs           |  5 +-
 .../sqllogictest/test_files/insert_to_external.slt | 38 ++++++++-
 5 files changed, 153 insertions(+), 4 deletions(-)

diff --git a/datafusion/common/src/dfschema.rs 
b/datafusion/common/src/dfschema.rs
index e16acbfedc..d8cd103a47 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -391,11 +391,33 @@ impl DFSchema {
             })
     }
 
+    /// Returns true if the two schemas have the same qualified named
+    /// fields with logically equivalent data types. Returns false otherwise.
+    ///
+    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
+    /// equivalence checking.
+    pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
+        if self.fields().len() != other.fields().len() {
+            return false;
+        }
+        let self_fields = self.fields().iter();
+        let other_fields = other.fields().iter();
+        self_fields.zip(other_fields).all(|(f1, f2)| {
+            f1.qualifier() == f2.qualifier()
+                && f1.name() == f2.name()
+                && Self::datatype_is_logically_equal(f1.data_type(), 
f2.data_type())
+        })
+    }
+
     /// Returns true if the two schemas have the same qualified named
     /// fields with the same data types. Returns false otherwise.
     ///
     /// This is a specialized version of Eq that ignores differences
     /// in nullability and metadata.
+    ///
+    /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
+    /// logical type checking, which for example would consider a dictionary
+    /// encoded UTF8 array to be equivalent to a plain UTF8 array.
     pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
         if self.fields().len() != other.fields().len() {
             return false;
@@ -409,6 +431,46 @@ impl DFSchema {
         })
     }
 
+    /// Checks if two [`DataType`]s are logically equal. This is a notably 
weaker constraint
+    /// than datatype_is_semantically_equal in that a Dictionary<K,V> type is 
logically
+    /// equal to a plain V type, but not semantically equal. Dictionary<K1, 
V1> is also
+    /// logically equal to Dictionary<K2, V1>.
+    fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
+        // check nested fields
+        match (dt1, dt2) {
+            (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
+                v1.as_ref() == v2.as_ref()
+            }
+            (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == 
othertype,
+            (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == 
othertype,
+            (DataType::List(f1), DataType::List(f2))
+            | (DataType::LargeList(f1), DataType::LargeList(f2))
+            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _))
+            | (DataType::Map(f1, _), DataType::Map(f2, _)) => {
+                Self::field_is_logically_equal(f1, f2)
+            }
+            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
+                let iter1 = fields1.iter();
+                let iter2 = fields2.iter();
+                fields1.len() == fields2.len() &&
+                        // all fields have to be the same
+                    iter1
+                    .zip(iter2)
+                        .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
+            }
+            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
+                let iter1 = fields1.iter();
+                let iter2 = fields2.iter();
+                fields1.len() == fields2.len() &&
+                    // all fields have to be the same
+                    iter1
+                        .zip(iter2)
+                        .all(|((t1, f1), (t2, f2))| t1 == t2 && 
Self::field_is_logically_equal(f1, f2))
+            }
+            _ => dt1 == dt2,
+        }
+    }
+
     /// Returns true of two [`DataType`]s are semantically equal (same
     /// name and type), ignoring both metadata and nullability.
     ///
@@ -456,6 +518,11 @@ impl DFSchema {
         }
     }
 
+    fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
+        f1.name() == f2.name()
+            && Self::datatype_is_logically_equal(f1.data_type(), 
f2.data_type())
+    }
+
     fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
         f1.name() == f2.name()
             && Self::datatype_is_semantically_equal(f1.data_type(), 
f2.data_type())
@@ -786,6 +853,13 @@ pub trait SchemaExt {
     ///
     /// It works the same as [`DFSchema::equivalent_names_and_types`].
     fn equivalent_names_and_types(&self, other: &Self) -> bool;
+
+    /// Returns true if the two schemas have the same qualified named
+    /// fields with logically equivalent data types. Returns false otherwise.
+    ///
+    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
+    /// equivalence checking.
+    fn logically_equivalent_names_and_types(&self, other: &Self) -> bool;
 }
 
 impl SchemaExt for Schema {
@@ -805,6 +879,23 @@ impl SchemaExt for Schema {
                     )
             })
     }
+
+    fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
+        if self.fields().len() != other.fields().len() {
+            return false;
+        }
+
+        self.fields()
+            .iter()
+            .zip(other.fields().iter())
+            .all(|(f1, f2)| {
+                f1.name() == f2.name()
+                    && DFSchema::datatype_is_logically_equal(
+                        f1.data_type(),
+                        f2.data_type(),
+                    )
+            })
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs 
b/datafusion/core/src/datasource/file_format/write/demux.rs
index 67dd1f9406..27c65dd459 100644
--- a/datafusion/core/src/datasource/file_format/write/demux.rs
+++ b/datafusion/core/src/datasource/file_format/write/demux.rs
@@ -29,7 +29,7 @@ use crate::physical_plan::SendableRecordBatchStream;
 
 use arrow_array::builder::UInt64Builder;
 use arrow_array::cast::AsArray;
-use arrow_array::{RecordBatch, StructArray};
+use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray, 
StructArray};
 use arrow_schema::{DataType, Schema};
 use datafusion_common::cast::as_string_array;
 use datafusion_common::DataFusionError;
@@ -338,6 +338,22 @@ fn compute_partition_keys_by_row<'a>(
                     partition_values.push(array.value(i));
                 }
             }
+            DataType::Dictionary(_, _) => {
+                downcast_dictionary_array!(
+                    col_array =>  {
+                        let array = col_array.downcast_dict::<StringArray>()
+                            .ok_or(DataFusionError::Execution(format!("it is 
not yet supported to write to hive partitions with datatype {}",
+                            dtype)))?;
+
+                        for val in array.values() {
+                            partition_values.push(
+                                
val.ok_or(DataFusionError::Execution(format!("Cannot partition by null value 
for column {}", col)))?
+                            );
+                        }
+                    },
+                    _ => unreachable!(),
+                )
+            }
             _ => {
                 return Err(DataFusionError::NotImplemented(format!(
                 "it is not yet supported to write to hive partitions with 
datatype {}",
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 822a78a552..d26d417bd8 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -824,7 +824,10 @@ impl TableProvider for ListingTable {
         overwrite: bool,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // Check that the schema of the plan matches the schema of this table.
-        if !self.schema().equivalent_names_and_types(&input.schema()) {
+        if !self
+            .schema()
+            .logically_equivalent_names_and_types(&input.schema())
+        {
             return plan_err!(
                 // Return an error if schema of the input query does not match 
with the table schema.
                 "Inserting query must have the same schema with the table."
diff --git a/datafusion/core/src/datasource/memory.rs 
b/datafusion/core/src/datasource/memory.rs
index a2f8e225e1..6bcaa97a40 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -209,7 +209,10 @@ impl TableProvider for MemTable {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // Create a physical plan from the logical plan.
         // Check that the schema of the plan matches the schema of this table.
-        if !self.schema().equivalent_names_and_types(&input.schema()) {
+        if !self
+            .schema()
+            .logically_equivalent_names_and_types(&input.schema())
+        {
             return plan_err!(
                 "Inserting query must have the same schema with the table."
             );
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt 
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index b2206e9878..8b01a14568 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -40,8 +40,44 @@ STORED AS CSV
 WITH HEADER ROW
 LOCATION '../../testing/data/csv/aggregate_test_100.csv'
 
-# test_insert_into
 
+statement ok
+create table dictionary_encoded_values as values 
+('a', arrow_cast('foo', 'Dictionary(Int32, Utf8)')), ('b', arrow_cast('bar', 
'Dictionary(Int32, Utf8)'));
+
+query TTT
+describe dictionary_encoded_values;
+----
+column1 Utf8 YES
+column2 Dictionary(Int32, Utf8) YES
+
+statement ok
+CREATE EXTERNAL TABLE dictionary_encoded_parquet_partitioned(
+  a varchar,
+  b varchar,
+) 
+STORED AS parquet
+LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned'
+PARTITIONED BY (b)
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query TT
+insert into dictionary_encoded_parquet_partitioned 
+select * from dictionary_encoded_values
+----
+2
+
+query TT
+select * from dictionary_encoded_parquet_partitioned order by (a);
+----
+a foo
+b bar
+
+
+# test_insert_into
 statement ok
 set datafusion.execution.target_partitions = 8;
 

Reply via email to