jorgecarleitao commented on a change in pull request #1066:
URL: https://github.com/apache/arrow-datafusion/pull/1066#discussion_r719971228



##########
File path: datafusion/src/physical_plan/udf.rs
##########
@@ -92,6 +95,23 @@ impl ScalarUDF {
             name: name.to_owned(),
             signature: signature.clone(),
             return_type: return_type.clone(),
+            volatility: Default::default(),

Review comment:
       I would move this to the function signature: volatility is an important 
concept that imo should be part of the `new` (which is where all properties are 
introduced). Users are free to define a new `new_scalar` with a default 
volatility if their scalars are systematically volatile.
   

##########
File path: datafusion/src/physical_plan/expressions/binary.rs
##########
@@ -512,46 +512,32 @@ pub fn binary_operator_data_type(
     }
 }
 
-impl PhysicalExpr for BinaryExpr {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        binary_operator_data_type(
-            &self.left.data_type(input_schema)?,
-            &self.op,
-            &self.right.data_type(input_schema)?,
-        )
-    }
-
-    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
-        Ok(self.left.nullable(input_schema)? || 
self.right.nullable(input_schema)?)
-    }
-
-    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
-        let left_value = self.left.evaluate(batch)?;
-        let right_value = self.right.evaluate(batch)?;
+impl BinaryExpr {

Review comment:
       This is just a simplification, (right?)

##########
File path: datafusion/src/physical_plan/functions.rs
##########
@@ -543,6 +553,28 @@ macro_rules! invoke_if_unicode_expressions_feature_flag {
     };
 }
 
+///Function volatility determines when a function can be inlined or in the 
future have evaluations elided when the arguments are the same
+///Immutable - a pure function which always remains the same
+///Stable - Maybe not required?. Used for functions which can modify the db 
but are stable for multiple calls in statement
+///Volatile - Functions where output can vary for each call.
+///For more information see 
https://www.postgresql.org/docs/current/xfunc-volatility.html
+#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
+#[repr(u32)]
+pub enum FunctionVolatility {
+    ///Immutable - a pure function which always remains the same
+    Immutable = 0,
+    ///Stable - Maybe not required?. Used for functions which can modify the 
db but are stable for multiple calls in statement is this relevant for 
datafusion as in mem?
+    Stable = 1,
+    ///Volatile - Functions where output can vary for each call.
+    Volatile = 2,
+}
+
+impl Default for FunctionVolatility {

Review comment:
       IMO there is no default volatility (i.e. I am -1 on this). Defaults make 
it very easy for someone to just use the default and ignore the semantics of 
volatility.
   
   For example, the volatility in spark never made it to pyspark, which made it 
impossible for users to write immutable UDFs, which forced some filtering 
pushdown to not be applied (even if the user _knows_ that the UDF is immutable).
   
   

##########
File path: datafusion/src/physical_plan/functions.rs
##########
@@ -543,6 +553,28 @@ macro_rules! invoke_if_unicode_expressions_feature_flag {
     };
 }
 
+///Function volatility determines when a function can be inlined or in the 
future have evaluations elided when the arguments are the same
+///Immutable - a pure function which always remains the same
+///Stable - Maybe not required?. Used for functions which can modify the db 
but are stable for multiple calls in statement
+///Volatile - Functions where output can vary for each call.
+///For more information see 
https://www.postgresql.org/docs/current/xfunc-volatility.html
+#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
+#[repr(u32)]

Review comment:
       why this representation?

##########
File path: datafusion/src/physical_plan/functions.rs
##########
@@ -1428,6 +1460,423 @@ where
     })
 }
 
+pub(crate) fn create_immutable_impl(

Review comment:
       This is needed to make (most/all) builtin immutable?

##########
File path: datafusion/src/physical_plan/udf.rs
##########
@@ -42,6 +43,8 @@ pub struct ScalarUDF {
     pub signature: Signature,
     /// Return type
     pub return_type: ReturnTypeFunction,
+    /// Volatility of the UDF

Review comment:
       ```suggestion
       /// [`FunctionVolatility`] of the UDF
   ```

##########
File path: datafusion/src/optimizer/tokomak/datatype.rs
##########
@@ -0,0 +1,166 @@
+use crate::error::DataFusionError;
+use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
+use egg::*;
+use std::convert::TryFrom;
+use std::fmt::Display;
+use std::str::FromStr;
+
+define_language! {
+    #[derive(Copy)]
+    pub enum TokomakDataType {
+        "date32" = Date32,
+        "date64" = Date64,
+        "bool" = Boolean,
+        "int8" = Int8,
+        "int16" =Int16,
+        "int32" =Int32,
+        "int64" =Int64,
+        "uint8" =UInt8,
+        "uint16" =UInt16,
+        "uint32" =UInt32,
+        "uint64" =UInt64,
+        "float16" =Float16,
+        "float32" =Float32,
+        "float64" =Float64,
+        "utf8"=Utf8,
+        "largeutf8"=LargeUtf8,
+        "time(s)"=TimestampSecond,
+        "time(ms)"=TimestampMillisecond,
+        "time(us)"=TimestampMicrosecond,
+        "time(ns)"=TimestampNanosecond,
+        "interval(yearmonth)"=IntervalYearMonth,
+        "interval(daytime)"=IntervalDayTime,
+    }
+}
+
+impl Display for TokomakDataType {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.write_fmt(format_args!("{:?}", self))
+    }
+}
+
+impl Into<DataType> for &TokomakDataType {
+    fn into(self) -> DataType {
+        let v = *self;
+        v.into()
+    }
+}
+
+impl Into<DataType> for TokomakDataType {
+    fn into(self) -> DataType {
+        match self {
+            TokomakDataType::Date32 => DataType::Date32,
+            TokomakDataType::Date64 => DataType::Date64,
+            TokomakDataType::Boolean => DataType::Boolean,
+            TokomakDataType::Int8 => DataType::Int8,
+            TokomakDataType::Int16 => DataType::Int16,
+            TokomakDataType::Int32 => DataType::Int32,
+            TokomakDataType::Int64 => DataType::Int64,
+            TokomakDataType::UInt8 => DataType::UInt8,
+            TokomakDataType::UInt16 => DataType::UInt16,
+            TokomakDataType::UInt32 => DataType::UInt32,
+            TokomakDataType::UInt64 => DataType::UInt64,
+            TokomakDataType::Float16 => DataType::Float16,
+            TokomakDataType::Float32 => DataType::Float32,
+            TokomakDataType::Float64 => DataType::Float64,
+            TokomakDataType::Utf8 => DataType::Utf8,
+            TokomakDataType::LargeUtf8 => DataType::LargeUtf8,
+            TokomakDataType::TimestampSecond => {
+                DataType::Timestamp(TimeUnit::Second, None)
+            }
+            TokomakDataType::TimestampMillisecond => {
+                DataType::Timestamp(TimeUnit::Millisecond, None)
+            }
+            TokomakDataType::TimestampMicrosecond => {
+                DataType::Timestamp(TimeUnit::Microsecond, None)
+            }
+            TokomakDataType::TimestampNanosecond => {
+                DataType::Timestamp(TimeUnit::Nanosecond, None)

Review comment:
       (no action needed, just a note) IMO we should solve this Timestamp with 
timezone ASAP: the arrow format supports timezone, and here we are already 
assuming that we not use it. However, this is tricky because we are just making 
it harder to support timezones.
   
   Note that the "big" problem with timezones is that the 
`DataType::Timestamp()` is virtually uncountable, which usually makes typing 
systems a bit difficult to build on top of.

##########
File path: datafusion/src/physical_plan/functions.rs
##########
@@ -543,6 +553,28 @@ macro_rules! invoke_if_unicode_expressions_feature_flag {
     };
 }
 
+///Function volatility determines when a function can be inlined or in the 
future have evaluations elided when the arguments are the same
+///Immutable - a pure function which always remains the same
+///Stable - Maybe not required?. Used for functions which can modify the db 
but are stable for multiple calls in statement
+///Volatile - Functions where output can vary for each call.
+///For more information see 
https://www.postgresql.org/docs/current/xfunc-volatility.html
+#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
+#[repr(u32)]
+pub enum FunctionVolatility {
+    ///Immutable - a pure function which always remains the same
+    Immutable = 0,
+    ///Stable - Maybe not required?. Used for functions which can modify the 
db but are stable for multiple calls in statement is this relevant for 
datafusion as in mem?
+    Stable = 1,
+    ///Volatile - Functions where output can vary for each call.
+    Volatile = 2,

Review comment:
       ```suggestion
       /// A pure function - one whose always returns the same given inputs and 
has no side-effects. (E.g. `SQRT`)
       Immutable,
       /// A function whose output can vary for each call. (E.g. `CURRENT_DATE`)
       Volatile,
   ```

##########
File path: datafusion/src/physical_plan/functions.rs
##########
@@ -543,6 +553,28 @@ macro_rules! invoke_if_unicode_expressions_feature_flag {
     };
 }
 
+///Function volatility determines when a function can be inlined or in the 
future have evaluations elided when the arguments are the same
+///Immutable - a pure function which always remains the same
+///Stable - Maybe not required?. Used for functions which can modify the db 
but are stable for multiple calls in statement
+///Volatile - Functions where output can vary for each call.
+///For more information see 
https://www.postgresql.org/docs/current/xfunc-volatility.html

Review comment:
       ```suggestion
   /// Determines when a function can be inlined or in the future have 
evaluations elided when the arguments are the same
   /// For more information see 
https://www.postgresql.org/docs/current/xfunc-volatility.html
   ```
   
   We can use the description of each item to document the enum

##########
File path: datafusion/src/physical_plan/udf.rs
##########
@@ -92,6 +95,23 @@ impl ScalarUDF {
             name: name.to_owned(),
             signature: signature.clone(),
             return_type: return_type.clone(),
+            volatility: Default::default(),
+            fun: fun.clone(),
+        }
+    }
+    /// Create a new ScalarUDF with the specified volatility
+    pub fn with_volatility(
+        name: &str,
+        signature: &Signature,
+        return_type: &ReturnTypeFunction,
+        fun: &ScalarFunctionImplementation,
+        volatility: FunctionVolatility,
+    ) -> Self {
+        Self {
+            name: name.to_owned(),
+            signature: signature.clone(),
+            return_type: return_type.clone(),
+            volatility,

Review comment:
       not needed :)

##########
File path: datafusion/src/physical_plan/functions.rs
##########
@@ -543,6 +553,28 @@ macro_rules! invoke_if_unicode_expressions_feature_flag {
     };
 }
 
+///Function volatility determines when a function can be inlined or in the 
future have evaluations elided when the arguments are the same
+///Immutable - a pure function which always remains the same
+///Stable - Maybe not required?. Used for functions which can modify the db 
but are stable for multiple calls in statement
+///Volatile - Functions where output can vary for each call.
+///For more information see 
https://www.postgresql.org/docs/current/xfunc-volatility.html
+#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]

Review comment:
       ```suggestion
   #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to