This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 250e71694b Support Partitioning Data by Dictionary Encoded String
Array Types (#7896)
250e71694b is described below
commit 250e71694b4f9789444e52b8cc12476dcbf35ac6
Author: Devin D'Angelo <[email protected]>
AuthorDate: Sat Oct 28 06:04:47 2023 -0400
Support Partitioning Data by Dictionary Encoded String Array Types (#7896)
* support dictionary encoded string columns for partition cols
* remove debug prints
* cargo fmt
* generic dictionary cast and dict encoded test
* updates from review
* force retry checks
* try checks again
---
datafusion/common/src/dfschema.rs | 91 ++++++++++++++++++++++
.../core/src/datasource/file_format/write/demux.rs | 18 ++++-
datafusion/core/src/datasource/listing/table.rs | 5 +-
datafusion/core/src/datasource/memory.rs | 5 +-
.../sqllogictest/test_files/insert_to_external.slt | 38 ++++++++-
5 files changed, 153 insertions(+), 4 deletions(-)
diff --git a/datafusion/common/src/dfschema.rs
b/datafusion/common/src/dfschema.rs
index e16acbfedc..d8cd103a47 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -391,11 +391,33 @@ impl DFSchema {
})
}
+ /// Returns true if the two schemas have the same qualified named
+ /// fields with logically equivalent data types. Returns false otherwise.
+ ///
+ /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
+ /// equivalence checking.
+ pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
+ if self.fields().len() != other.fields().len() {
+ return false;
+ }
+ let self_fields = self.fields().iter();
+ let other_fields = other.fields().iter();
+ self_fields.zip(other_fields).all(|(f1, f2)| {
+ f1.qualifier() == f2.qualifier()
+ && f1.name() == f2.name()
+ && Self::datatype_is_logically_equal(f1.data_type(),
f2.data_type())
+ })
+ }
+
/// Returns true if the two schemas have the same qualified named
/// fields with the same data types. Returns false otherwise.
///
/// This is a specialized version of Eq that ignores differences
/// in nullability and metadata.
+ ///
+ /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
+ /// logical type checking, which for example would consider a dictionary
+ /// encoded UTF8 array to be equivalent to a plain UTF8 array.
pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
if self.fields().len() != other.fields().len() {
return false;
@@ -409,6 +431,46 @@ impl DFSchema {
})
}
+ /// Checks if two [`DataType`]s are logically equal. This is a notably
weaker constraint
+ /// than datatype_is_semantically_equal in that a Dictionary<K,V> type is
logically
+ /// equal to a plain V type, but not semantically equal. Dictionary<K1,
V1> is also
+ /// logically equal to Dictionary<K2, V1>.
+ fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
+ // check nested fields
+ match (dt1, dt2) {
+ (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
+ v1.as_ref() == v2.as_ref()
+ }
+ (DataType::Dictionary(_, v1), othertype) => v1.as_ref() ==
othertype,
+ (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() ==
othertype,
+ (DataType::List(f1), DataType::List(f2))
+ | (DataType::LargeList(f1), DataType::LargeList(f2))
+ | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _))
+ | (DataType::Map(f1, _), DataType::Map(f2, _)) => {
+ Self::field_is_logically_equal(f1, f2)
+ }
+ (DataType::Struct(fields1), DataType::Struct(fields2)) => {
+ let iter1 = fields1.iter();
+ let iter2 = fields2.iter();
+ fields1.len() == fields2.len() &&
+ // all fields have to be the same
+ iter1
+ .zip(iter2)
+ .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
+ }
+ (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
+ let iter1 = fields1.iter();
+ let iter2 = fields2.iter();
+ fields1.len() == fields2.len() &&
+ // all fields have to be the same
+ iter1
+ .zip(iter2)
+ .all(|((t1, f1), (t2, f2))| t1 == t2 &&
Self::field_is_logically_equal(f1, f2))
+ }
+ _ => dt1 == dt2,
+ }
+ }
+
/// Returns true of two [`DataType`]s are semantically equal (same
/// name and type), ignoring both metadata and nullability.
///
@@ -456,6 +518,11 @@ impl DFSchema {
}
}
+ fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
+ f1.name() == f2.name()
+ && Self::datatype_is_logically_equal(f1.data_type(),
f2.data_type())
+ }
+
fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
f1.name() == f2.name()
&& Self::datatype_is_semantically_equal(f1.data_type(),
f2.data_type())
@@ -786,6 +853,13 @@ pub trait SchemaExt {
///
/// It works the same as [`DFSchema::equivalent_names_and_types`].
fn equivalent_names_and_types(&self, other: &Self) -> bool;
+
+ /// Returns true if the two schemas have the same qualified named
+ /// fields with logically equivalent data types. Returns false otherwise.
+ ///
+ /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
+ /// equivalence checking.
+ fn logically_equivalent_names_and_types(&self, other: &Self) -> bool;
}
impl SchemaExt for Schema {
@@ -805,6 +879,23 @@ impl SchemaExt for Schema {
)
})
}
+
+ fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
+ if self.fields().len() != other.fields().len() {
+ return false;
+ }
+
+ self.fields()
+ .iter()
+ .zip(other.fields().iter())
+ .all(|(f1, f2)| {
+ f1.name() == f2.name()
+ && DFSchema::datatype_is_logically_equal(
+ f1.data_type(),
+ f2.data_type(),
+ )
+ })
+ }
}
#[cfg(test)]
diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs
b/datafusion/core/src/datasource/file_format/write/demux.rs
index 67dd1f9406..27c65dd459 100644
--- a/datafusion/core/src/datasource/file_format/write/demux.rs
+++ b/datafusion/core/src/datasource/file_format/write/demux.rs
@@ -29,7 +29,7 @@ use crate::physical_plan::SendableRecordBatchStream;
use arrow_array::builder::UInt64Builder;
use arrow_array::cast::AsArray;
-use arrow_array::{RecordBatch, StructArray};
+use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray,
StructArray};
use arrow_schema::{DataType, Schema};
use datafusion_common::cast::as_string_array;
use datafusion_common::DataFusionError;
@@ -338,6 +338,22 @@ fn compute_partition_keys_by_row<'a>(
partition_values.push(array.value(i));
}
}
+ DataType::Dictionary(_, _) => {
+ downcast_dictionary_array!(
+ col_array => {
+ let array = col_array.downcast_dict::<StringArray>()
+ .ok_or(DataFusionError::Execution(format!("it is
not yet supported to write to hive partitions with datatype {}",
+ dtype)))?;
+
+ for val in array.values() {
+ partition_values.push(
+
val.ok_or(DataFusionError::Execution(format!("Cannot partition by null value
for column {}", col)))?
+ );
+ }
+ },
+ _ => unreachable!(),
+ )
+ }
_ => {
return Err(DataFusionError::NotImplemented(format!(
"it is not yet supported to write to hive partitions with
datatype {}",
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 822a78a552..d26d417bd8 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -824,7 +824,10 @@ impl TableProvider for ListingTable {
overwrite: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
// Check that the schema of the plan matches the schema of this table.
- if !self.schema().equivalent_names_and_types(&input.schema()) {
+ if !self
+ .schema()
+ .logically_equivalent_names_and_types(&input.schema())
+ {
return plan_err!(
// Return an error if schema of the input query does not match
with the table schema.
"Inserting query must have the same schema with the table."
diff --git a/datafusion/core/src/datasource/memory.rs
b/datafusion/core/src/datasource/memory.rs
index a2f8e225e1..6bcaa97a40 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -209,7 +209,10 @@ impl TableProvider for MemTable {
) -> Result<Arc<dyn ExecutionPlan>> {
// Create a physical plan from the logical plan.
// Check that the schema of the plan matches the schema of this table.
- if !self.schema().equivalent_names_and_types(&input.schema()) {
+ if !self
+ .schema()
+ .logically_equivalent_names_and_types(&input.schema())
+ {
return plan_err!(
"Inserting query must have the same schema with the table."
);
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index b2206e9878..8b01a14568 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -40,8 +40,44 @@ STORED AS CSV
WITH HEADER ROW
LOCATION '../../testing/data/csv/aggregate_test_100.csv'
-# test_insert_into
+statement ok
+create table dictionary_encoded_values as values
+('a', arrow_cast('foo', 'Dictionary(Int32, Utf8)')), ('b', arrow_cast('bar',
'Dictionary(Int32, Utf8)'));
+
+query TTT
+describe dictionary_encoded_values;
+----
+column1 Utf8 YES
+column2 Dictionary(Int32, Utf8) YES
+
+statement ok
+CREATE EXTERNAL TABLE dictionary_encoded_parquet_partitioned(
+ a varchar,
+ b varchar,
+)
+STORED AS parquet
+LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned'
+PARTITIONED BY (b)
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query TT
+insert into dictionary_encoded_parquet_partitioned
+select * from dictionary_encoded_values
+----
+2
+
+query TT
+select * from dictionary_encoded_parquet_partitioned order by (a);
+----
+a foo
+b bar
+
+
+# test_insert_into
statement ok
set datafusion.execution.target_partitions = 8;