notfilippo opened a new issue, #11513:
URL: https://github.com/apache/datafusion/issues/11513

   ## Abstract
   Logical types are an abstract representation of data that emphasises 
structure without regard for physical implementation, while physical types are 
tangible representations that describe how data will be stored, accessed, and 
retrieved.
   
   Currently the type model in DataFusion, both in logical and physical plans, 
relies purely on arrow’s 
[DataType](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html 
"https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html";) enum. But 
whilst this choice makes sense its physical execution engine (DataTypes map 1:1 
with the physical array representation, defining implicit rules for storage, 
access, and retrieval), it has flaws when dealing with its logical plans. This 
is due to the fact that some logically equivalent array types in the Arrow 
format have very different DataTypes – for example a logical string array can 
be represented in the Arrow array of DataType `Utf8`, `Dictionary(Utf8)`, 
`RunEndEncoded(Utf8)`, and `StringView` (without mentioning the different 
indexes types that can be specified for dictionaries and REE arrays).
   
   This proposal evaluates possible solutions for decoupling the notion of 
types in DataFusion’s logical plans with DataTypes, evaluating their impact on 
DataFusion itself and on downstream crates.
   ## Goals
   
   - Introduce a subset of 
[DataType](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html 
"https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html";)s to be used 
as logical types
   - Define the boundaries of logical types and physical types in the planning 
and execution path
   - Define a solution to keep track of a column’s logical and physical type
   - Introduce a new logical 
[ScalarValue](https://docs.rs/datafusion/latest/datafusion/common/enum.ScalarValue.html
 "https://docs.rs/datafusion/latest/datafusion/common/enum.ScalarValue.html";)
   - Define how to specify materialisation options for physical types
   - Evaluate the impact on downstream crates
   ## Proposal
   ### Defining a logical type
   To define the list of logical types we must first take a look at the 
physical representation of the engine: the Arrow columnar format. DataTypes are 
the physical types of the DataFusion engine and they define storage and access 
pattern for buffers in the Arrow format. 
   
   Looking at a list of the possible DataTypes it's clear that while some map 
1:1 with their logical representation other also specify information about the 
encoding (e.g. `Large*`, `FixedSize*`, `Dictionary` , `RunEndEncoded`...). The 
latter must be consolidate into what they represent, discarding the encoding 
information and, in general, types that can store different ranges of values 
should be different logical types. 
([ref](https://github.com/apache/datafusion/pull/11160#discussion_r1660952712)).
   
   What follows is a list of DataTypes and how would they map to their 
respective logical type following the rules above:
   
   | [DataType](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html 
"https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html";) (aka 
physical type) | Logical type                                      | Note       
                                                                                
                                                |
   | 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
 | ------------------------------------------------- | 
------------------------------------------------------------------------------------------------------------------------------------------
 |
   | `Null`                                                                     
                                                                                
       | `Null`                                            |                    
                                                                                
                                        |
   | `Boolean`                                                                  
                                                                                
       | `Boolean`                                         |                    
                                                                                
                                        |
   | `Int8`                                                                     
                                                                                
       | `Int8`                                            |                    
                                                                                
                                        |
   | `Int16`                                                                    
                                                                                
       | `Int16`                                           |                    
                                                                                
                                        |
   | `Int32`                                                                    
                                                                                
       | `Int32`                                           |                    
                                                                                
                                        |
   | `Int64`                                                                    
                                                                                
       | `Int64`                                           |                    
                                                                                
                                        |
   | `UInt8`                                                                    
                                                                                
       | `UInt8`                                           |                    
                                                                                
                                        |
   | `UInt16`                                                                   
                                                                                
       | `Uint16`                                          |                    
                                                                                
                                        |
   | `UInt32`                                                                   
                                                                                
       | `UInt32`                                          |                    
                                                                                
                                        |
   | `UInt64`                                                                   
                                                                                
       | `UInt64`                                          |                    
                                                                                
                                        |
   | `Float16`                                                                  
                                                                                
       | `Float16`                                         |                    
                                                                                
                                        |
   | `Float32`                                                                  
                                                                                
       | `Float32`                                         |                    
                                                                                
                                        |
   | `Float64`                                                                  
                                                                                
       | `Float64`                                         |                    
                                                                                
                                        |
   | `Timestamp(unit, tz)`                                                      
                                                                                
       | `Timestamp(unit, tz)`                             |                    
                                                                                
                                        |
   | `Date32`                                                                   
                                                                                
       | `Date`                                            |                    
                                                                                
                                        |
   | `Date64`                                                                   
                                                                                
       | `Date`                                            | `Date64` doesn't 
actually provide more precision. 
([docs](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Date64))
 |
   | `Time32(unit)`                                                             
                                                                                
       | `Time32(unit)`                                    |                    
                                                                                
                                        |
   | `Time64(unit)`                                                             
                                                                                
       | `Time64(unit)`                                    |                    
                                                                                
                                        |
   | `Duration(unit)`                                                           
                                                                                
       | `Duration(uint)`                                  |                    
                                                                                
                                        |
   | `Interval(unit)`                                                           
                                                                                
       | `Interval(unit)`                                  |                    
                                                                                
                                        |
   | `Binary`                                                                   
                                                                                
       | `Binary`                                          |                    
                                                                                
                                        |
   | `FixedSizeBinary(size)`                                                    
                                                                                
       | `Binary`                                          |                    
                                                                                
                                        |
   | `LargeBinary`                                                              
                                                                                
       | `Binary`                                          |                    
                                                                                
                                        |
   | `BinaryView`                                                               
                                                                                
       | `Binary`                                          |                    
                                                                                
                                        |
   | `Utf8`                                                                     
                                                                                
       | `Utf8`                                            |                    
                                                                                
                                        |
   | `LargeUtf8`                                                                
                                                                                
       | `Utf8`                                            |                    
                                                                                
                                        |
   | `Utf8View`                                                                 
                                                                                
       | `Utf8`                                            |                    
                                                                                
                                        |
   | `List(field)`                                                              
                                                                                
       | `List(field)`                                     |                    
                                                                                
                                        |
   | `ListView(field)`                                                          
                                                                                
       | `List(field)`                                     |                    
                                                                                
                                        |
   | `FixedSizeList(field, size)`                                               
                                                                                
       | `List(field)`                                     |                    
                                                                                
                                        |
   | `LargeList(field)`                                                         
                                                                                
       | `List(field)`                                     |                    
                                                                                
                                        |
   | `LargeListView(field)`                                                     
                                                                                
       | `List(field)`                                     |                    
                                                                                
                                        |
   | `Struct(fields)`                                                           
                                                                                
       | `Struct(fields)`                                  |                    
                                                                                
                                        |
   | `Union(fields, mode)`                                                      
                                                                                
       | `Union(fields)`                                   |                    
                                                                                
                                        |
   | `Dictionary(index_type, data_type)`                                        
                                                                                
       | underlying `data_type`, converted to logical type |                    
                                                                                
                                        |
   | `Decimal128(precision, scale)`                                             
                                                                                
       | `Decimal128(precision, scale)`                    |                    
                                                                                
                                        |
   | `Decimal256(precision, scale)`                                             
                                                                                
       | `Decimal256(precision, scale)`                    |                    
                                                                                
                                        |
   | `Map(fields, sorted)`                                                      
                                                                                
       | `Map(fields, sorted)`                             |                    
                                                                                
                                        |
   | `RunEndEncoded(run_ends_type, data_type)`                                  
                                                                                
       | underlying `data_type`, converted to logical type |                    
                                                                                
                                        |
   #### User defined types
   ##### User defined physical types
   The Arrow columnar format provides guidelines to define [Extension 
types](https://arrow.apache.org/docs/format/Columnar.html#extension-types) 
though the composition of native DataTypes and custom metadata in fields. Since 
this proposal already includes a mapping from DataType to logical type we could 
extend it to support user defined types (through extension types) which would 
map to a known logical type.
   
   For example an extension type with the DataType of `List(UInt8)` and a 
custom metadata field `{'ARROW:extension:name': 'myorg.mystring'}` could have a 
logical type of `Utf8`.
   ##### User defined logical types
   Arrow extension types can also be used to extend the list of supported 
logical types. An additional logical type called `Extension` could be 
introduced. This extension type would contain a structure detailing its logical 
type and the extension type metadata.
   ### Boundaries of logical and physical types
   #### In plans and expressions
   As the prefix suggests, logical types should be used exclusively in logical 
plans 
([LogicalPlan](https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html)
 and 
[Expr](https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html))
 while physical types should be used exclusively in physical plans 
([ExecutionPlan](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html)
 and 
[PhysicalExpr](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.PhysicalExpr.html)).
 This would enable logical plans to be purely logical, without worrying about 
underlying encodings.
   
   
[Expr](https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html)
 in logical plans would need to represent their resulting value as logical 
types through the trait method 
[ExprSchemable::get_type](https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.ExprSchemable.html#tymethod.get_type),
 which would need to return a logical type instead.
   #### In functions
   
[ScalarUDF](https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.ScalarUDFImpl.html),
 
[WindowUDF](https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.WindowUDFImpl.html),
 and 
[AggregateUDF](https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.AggregateUDFImpl.html)
 all define their 
[Signatures](https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.Signature.html)
 through the use of DataTypes. Function arguments are currently validated 
against signatures through [type coercion during logical 
planning](https://github.com/apache/datafusion/blob/08c5345e932f1c5c948751e0d06b1fd99e174efa/datafusion/expr/src/type_coercion/functions.rs#L484).
 With logical types Signatures would be expressed without the need to specify 
the underlying encoding. This would simplify the type coercion rules, removing 
the need of traversing dictionaries and handling different containers and 
focusing instead on explicit logical rules (e.g. all logical types can be coe
 rced to `Utf8`).
   
   During execution the function receives a slice of 
[ColumnarValue](https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.ColumnarValue.html#)
 that is guaranteed to match the signature. Being strictly a physical 
operation, the function will have to deal with physical types. 
[ColumnarValue](https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.ColumnarValue.html#)
 enum could be extended so that functions could choose to provide their own 
optimised implementation for a subset of physical types and then fall back to a 
generic implementation that materialises the argument to known physical type. 
This would potentially allow native functions to support user defined physical 
types that map to known logical types.
   #### In substrait
   The 
[`datafusion_substrait`](https://docs.rs/datafusion-substrait/latest/datafusion_substrait/)
 crate provides helper functions to enable interoperability between substrait 
plans and datafusion's plan. While some effort has been made to support 
converting from / to DataTypes via `type_variation_reference` ([example 
here](https://github.com/apache/datafusion/blob/133128840ca3dbea200dcfe84050cb7b82bf94a8/datafusion/substrait/src/logical_plan/producer.rs#L1524-L1535)),
 dictionaries and not supported as both literal types and cast types, leading 
to potential errors when trying to encode a valid 
[LogicalPlan](https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html))
 into a substrait plan. The usage of logical types would enable a more seamless 
transition between DataFusion's native logical plan and substrait.
   ### Keeping track of the physical type
   While logical types simplify the list of possible types that can be handled 
during logical planning, the relation to their underlying physical 
representation needs to be accounted for when transforming the logical plan 
into nested  
[ExecutionPlan](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html)
 and 
[PhysicalExpr](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.PhysicalExpr.html)
 which will dictate how will the query execute.
   
   This proposal introduces a new trait that represents the link between a 
logical type and its underlying physical representation:
   
   ```rust
   pub enum LogicalType {/*...*/}
   
   pub type TypeRelationRef = Arc<dyn TypeRelation + Send + Sync>;
   
   pub trait TypeRelation: std::fmt::Debug {
       fn logical(&self) -> &LogicalType;
       fn physical(&self) -> &DataType;
       // ...
   }
   
   #[derive(Clone, Debug)]
   pub struct NativeType {
       logical: LogicalType,
       physical: DataType,
   }
   
   impl TypeRelation for NativeType {/*...*/}
   impl From<DataType> for NativeType {/*...*/}
   impl From<DataType> for LogicalType {/*...*/}
   
   
   #[derive(Clone, Debug)]
   pub struct LogicalPhysicalType(TypeRelationRef);
   
   impl TypeRelation for LogicalPhysicalType {/*...*/}
   ```
   
   While `NativeType` would be primarily used for standard DataTypes and their 
logical relation, `TypeRelation` is defined to provide support for used defined 
physical types.
   
   What follows is an exploration of the areas in which `LogicalPhysicalType` 
would need to get introduced:
   #### A new type of Schema
   To support the creation of `LogicalPhysicalType` a new schema must be 
introduced, which can be consumed as either a logical schema or used to access 
the underlying physical representation. Currently 
[DFSchema](https://docs.rs/datafusion/latest/datafusion/common/struct.DFSchema.html)
 is used throughout DataFusion as a thin wrapper for Arrow's native 
[Schema](https://docs.rs/arrow/latest/arrow/datatypes/struct.Schema.html) in 
order to qualify fields originating from potentially different tables. This 
proposal suggest to decouple the DFSchema from its underlying Schema and 
instead adopt a new Schema-compatible structure (`LogicalPhysicalSchema`) but 
with DataTypes replaced by `LogicalPhysicalType`. This would also mean the 
introduction of new Field-compatible structure (`LogicalPhysicalField`) which 
also supports `LogicalPhysicalType` instead of Arrow's native 
[Field](https://docs.rs/arrow/latest/arrow/datatypes/struct.Field.html) 
DataType. 
   
   DFSchema would be used by DataFusion's planning and execution engine to 
derive either logical or physical type information of each field. It should 
retain the current interoperability with Schema (and additionally the new 
`LogicalPhysicalSchema`) allowing easy `Into` & `From` conversion.
   #### Type sources
   Types in plans sourced through Arrow's native 
[Schema](https://docs.rs/arrow/latest/arrow/datatypes/struct.Schema.html) 
returned by implementations of 
[TableSource](https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.TableSource.html)
 / 
[TableProvider](https://docs.rs/datafusion/latest/datafusion/datasource/provider/trait.TableProvider.html)
 , variables DataTypes returned by 
[VarProvider](https://docs.rs/datafusion/latest/datafusion/logical_expr/var_provider/trait.VarProvider.html)
 , and 
[ScalarValue](https://docs.rs/datafusion/latest/datafusion/common/enum.ScalarValue.html).
  To allow definition of custom `LogicalPhysicalType` these type sources should 
be edited to return `LogicalPhysicalSchema` / `LogicalPhysicalType`.
   ##### Tables
   For tables a non-breaking way of editing the trait to support 
`LogicalPhysicalSchema` could be:
   
   1. Declare a new trait method `logical_physical_schema() -> 
LogicalPhysicalSchema`, this method's default implementation calls the 
[`schema()`](https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.TableSource.html#tymethod.schema)
 and converts it to `LogicalPhysicalSchema` without introducing any custom 
`LogicalPhysicalType`. Implementers are free to override this method and add 
custom `LogicalPhysicalType`.
   2. Declare the existing `schema()` method to return `impl 
Into<LogicalPhysicalSchema>`.
   
   > Open question: Should we really introduce a new schema type or should we 
reuse DFSchema? The qualifiers for fields in DFSchema should not be specified 
by the Tables themselves.
   ##### VarProvider
   
[VarProvider](https://docs.rs/datafusion/latest/datafusion/logical_expr/var_provider/trait.VarProvider.html)
 needs to be edited in order to return a `LogicalPhysicalType` when getting the 
type of the variable, while the actual variable can very well remain a 
`ScalarValue`.
   ##### ScalarValue
   ScalarValue should be wrapped in order to have a way of retrieving both its 
logical and its underlying physical type. When reasoning about logical plans it 
should be treated as its logical type while its physical properties should be 
accessed exclusively by the physical plan.
   
   > Open question: Should ScalarValue split into a LogicalScalarValue and a 
PhysicalScalarValue? Or should it just provide generic information during 
logical planning (e.g. its logical type, `is_null()`, `is_zero()`) without 
access the underlying physical representation?
   ### Physical execution
   For physical planning and execution, much like the invocation of UDFs, 
[ExecutionPlan](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html)
 and 
[PhysicalExpr](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.PhysicalExpr.html)
 must also be granted access to the `LogicalPhysicalType` in order to have the 
capabilities of performing optimised execution for a subset of supported 
physical types and then fall back to a generic implementation that materialises 
other types to known physical type. This can be achieved by substituting the 
use DataType and Schema with, respectively, `LogicalPhysicalType` and 
`LogicalPhysicalSchema`.
   ### Impact on downstream dependencies
   Care must be put in place not to introduce breaking changes for downstream 
crates and dependencies that build on top of DataFusion. 
   
   The most impactful changes introduced by this proposal are the 
`LogicalPhysicalType`, `LogicalPhysicalSchema` and `LogicalPhysicalField` 
types. These structures would replace most of the mentions of DataType, Schema 
and Field in the DataFusion codebase. Type sources (TableProvider / 
TableSource, VarProvider, and ScalarValue) and Logical / ExecutionPlan nodes 
would be greatly affected by this change. This effect can be mitigated by 
providing good `Into` & `From` implementations for the new types and providing 
editing existing function arguments and return types as `impl 
Into<LogicalPhysical*>`, but it will still break a lot of things.
   #### Case study: datafusion-comet
   [`datafusion-comet`](https://github.com/apache/datafusion-comet) is a 
high-performance accelerator for Apache Spark, built on top of DataFusion. A 
[fork](https://github.com/notfilippo/datafusion-comet/tree/fr/logical-types) of 
the project containing changes from this proposal currently compiles without 
modifications. As more features in this proposal are implemented, namely UDFs 
Logical Signature, some refactoring might be required (e.g for 
`CometScalarFunction` and other functions defined in the codebase). Refer to 
the draft's TODOs to see what's missing.
   ### Draft implementation
   The draft work can be tracked via 
https://github.com/apache/datafusion/pull/11160.
   ### To Do
   - [ ] ScalarValue logical implementation
   - [ ] UDFs Signature implementation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to