Jefffrey commented on code in PR #19895:
URL: https://github.com/apache/datafusion/pull/19895#discussion_r2723863008


##########
datafusion/common/src/scalar/mod.rs:
##########
@@ -428,6 +429,8 @@ pub enum ScalarValue {
     Union(Option<(i8, Box<ScalarValue>)>, UnionFields, UnionMode),
     /// Dictionary type: index type and value
     Dictionary(Box<DataType>, Box<ScalarValue>),
+    /// (run-ends field, value field, value)
+    RunEndEncoded(FieldRef, FieldRef, Box<ScalarValue>),

Review Comment:
   Mimicking the arrow type where it stores fields:
   
   - 
https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.RunEndEncoded
   
   I tried initially only storing the index `DataType` and the `ScalarValue` 
value, but figured it would be better to try be as accurate as possible 🤔 



##########
datafusion/proto-common/src/to_proto/mod.rs:
##########
@@ -191,37 +193,44 @@ impl TryFrom<&DataType> for 
protobuf::arrow_type::ArrowTypeEnum {
                     value: Some(Box::new(value_type.as_ref().try_into()?)),
                 }))
             }
-            DataType::Decimal32(precision, scale) => 
Self::Decimal32(protobuf::Decimal32Type {
-                precision: *precision as u32,
-                scale: *scale as i32,
-            }),
-            DataType::Decimal64(precision, scale) => 
Self::Decimal64(protobuf::Decimal64Type {
-                precision: *precision as u32,
-                scale: *scale as i32,
-            }),
-            DataType::Decimal128(precision, scale) => 
Self::Decimal128(protobuf::Decimal128Type {
-                precision: *precision as u32,
-                scale: *scale as i32,
-            }),
-            DataType::Decimal256(precision, scale) => 
Self::Decimal256(protobuf::Decimal256Type {
-                precision: *precision as u32,
-                scale: *scale as i32,
-            }),
-            DataType::Map(field, sorted) => {
-                Self::Map(Box::new(
-                    protobuf::Map {
-                        field_type: Some(Box::new(field.as_ref().try_into()?)),
-                        keys_sorted: *sorted,
-                    }
-                ))
-            }
-            DataType::RunEndEncoded(_, _) => {
-                return Err(Error::General(
-                    "Proto serialization error: The RunEndEncoded data type is 
not yet supported".to_owned()
-                ))
+            DataType::Decimal32(precision, scale) => {

Review Comment:
   Only change here is for RunEndEncoded; for some reason other formatting 
changes were applied for the other arms here



##########
datafusion/common/src/scalar/mod.rs:
##########
@@ -7256,6 +7494,30 @@ mod tests {
         }
     }
 
+    #[test]
+    fn roundtrip_run_array() {
+        // Comparison logic in round_trip_through_scalar doesn't work for 
RunArrays
+        // so we have a custom test for them

Review Comment:
   Actually https://github.com/apache/arrow-rs/pull/9213 might fix this for us 
🤔 
   
   Might leave a comment to this effect



##########
datafusion/common/src/scalar/mod.rs:
##########
@@ -9228,6 +9490,175 @@ mod tests {
         assert_eq!(value.len(), buffers[0].len());
     }
 
+    #[test]
+    fn test_to_array_of_size_run_end_encoded() {
+        fn run_test<R: RunEndIndexType>() {
+            let value = Box::new(ScalarValue::Float32(Some(1.0)));
+            let size = 5;
+            let scalar = ScalarValue::RunEndEncoded(
+                Field::new("run_ends", R::DATA_TYPE, false).into(),
+                Field::new("values", DataType::Float32, true).into(),
+                value.clone(),
+            );
+            let array = scalar.to_array_of_size(size).unwrap();
+            let array = array.as_run::<R>();
+            let array = array.downcast::<Float32Array>().unwrap();
+            assert_eq!(vec![Some(1.0); size], 
array.into_iter().collect::<Vec<_>>());
+            assert_eq!(1, array.values().len());
+        }
+
+        run_test::<Int16Type>();
+        run_test::<Int32Type>();
+        run_test::<Int64Type>();
+
+        let scalar = ScalarValue::RunEndEncoded(
+            Field::new("run_ends", DataType::Int16, false).into(),
+            Field::new("values", DataType::Float32, true).into(),
+            Box::new(ScalarValue::Float32(Some(1.0))),
+        );
+        let err = scalar.to_array_of_size(i16::MAX as usize + 10).unwrap_err();
+        assert_eq!(
+            "Execution error: Cannot construct RunArray of size 32777: 
Overflows run-ends type Int16",
+            err.to_string()
+        )
+    }
+
+    #[test]
+    fn test_eq_array_run_end_encoded() {
+        let run_ends = Int16Array::from(vec![1, 3]);
+        let values = Float32Array::from(vec![None, Some(1.0)]);
+        let run_array =
+            Arc::new(RunArray::try_new(&run_ends, &values).unwrap()) as 
ArrayRef;
+
+        let scalar = ScalarValue::RunEndEncoded(
+            Field::new("run_ends", DataType::Int16, false).into(),
+            Field::new("values", DataType::Float32, true).into(),
+            Box::new(ScalarValue::Float32(None)),
+        );
+        assert!(scalar.eq_array(&run_array, 0).unwrap());
+
+        let scalar = ScalarValue::RunEndEncoded(
+            Field::new("run_ends", DataType::Int16, false).into(),
+            Field::new("values", DataType::Float32, true).into(),
+            Box::new(ScalarValue::Float32(Some(1.0))),
+        );
+        assert!(scalar.eq_array(&run_array, 1).unwrap());
+        assert!(scalar.eq_array(&run_array, 2).unwrap());
+
+        // value types must match
+        let scalar = ScalarValue::RunEndEncoded(
+            Field::new("run_ends", DataType::Int16, false).into(),
+            Field::new("values", DataType::Float64, true).into(),
+            Box::new(ScalarValue::Float64(Some(1.0))),
+        );
+        let err = scalar.eq_array(&run_array, 1).unwrap_err();
+        let expected = "Internal error: could not cast array of type Float32 
to 
arrow_array::array::primitive_array::PrimitiveArray<arrow_array::types::Float64Type>";
+        assert!(err.to_string().starts_with(expected));

Review Comment:
   Needed to use `starts_with` since backtrace feature can affect the error 
message, so direct equality can succeed for `cargo test` but fail in CI



##########
datafusion/common/src/scalar/mod.rs:
##########
@@ -1573,6 +1601,8 @@ impl ScalarValue {
             | DataType::Float16
             | DataType::Float32
             | DataType::Float64
+            | DataType::Decimal32(_, _)
+            | DataType::Decimal64(_, _)

Review Comment:
   Little fix since we were missing these



##########
datafusion/common/src/scalar/mod.rs:
##########
@@ -2597,6 +2641,94 @@ impl ScalarValue {
                     _ => unreachable!("Invalid dictionary keys type: {}", 
key_type),
                 }
             }
+            DataType::RunEndEncoded(run_ends_field, value_field) => {
+                fn make_run_array<R: RunEndIndexType>(
+                    scalars: impl IntoIterator<Item = ScalarValue>,
+                    run_ends_field: &FieldRef,
+                    values_field: &FieldRef,
+                ) -> Result<ArrayRef> {
+                    let mut scalars = scalars.into_iter();
+
+                    let mut run_ends = vec![];
+                    let mut value_scalars = vec![];
+
+                    let mut len = R::Native::ONE;
+                    let mut current =
+                        if let Some(ScalarValue::RunEndEncoded(_, _, scalar)) =
+                            scalars.next()
+                        {
+                            *scalar
+                        } else {
+                            // We are guaranteed to have one element of correct
+                            // type because we peeked above
+                            unreachable!()
+                        };
+                    for scalar in scalars {
+                        let scalar = match scalar {
+                            ScalarValue::RunEndEncoded(
+                                inner_run_ends_field,
+                                inner_value_field,
+                                scalar,
+                            ) if &inner_run_ends_field == run_ends_field
+                                && &inner_value_field == values_field =>
+                            {
+                                *scalar
+                            }
+                            _ => {
+                                return _exec_err!(
+                                    "Expected RunEndEncoded scalar with 
run-ends field {run_ends_field} but got: {scalar:?}"
+                                );
+                            }
+                        };
+
+                        // new run
+                        if scalar != current {
+                            run_ends.push(len);
+                            value_scalars.push(current);
+                            current = scalar;
+                        }
+
+                        len = len.add_checked(R::Native::ONE).map_err(|_| {
+                            DataFusionError::Execution(format!(
+                                "Cannot construct RunArray: Overflows run-ends 
type {}",
+                                run_ends_field.data_type()
+                            ))
+                        })?;
+                    }
+
+                    run_ends.push(len);
+                    value_scalars.push(current);
+
+                    let run_ends = 
PrimitiveArray::<R>::from_iter_values(run_ends);
+                    let values = ScalarValue::iter_to_array(value_scalars)?;
+
+                    // Using ArrayDataBuilder so we can maintain the fields

Review Comment:
   I think this is the only way to construct runarrays with fields we want, 
since `try_new` creates the fields for us:
   
   
https://github.com/apache/arrow-rs/blob/ddb6c42194fa45516e1bd4a27cdacf10fda56b5a/arrow-array/src/array/run_array.rs#L99-L105



##########
datafusion/common/src/scalar/mod.rs:
##########
@@ -1660,8 +1698,7 @@ impl ScalarValue {
                 }
             }
 
-            // Unsupported types for now
-            _ => {
+            DataType::ListView(_) | DataType::LargeListView(_) => {

Review Comment:
   Just getting rid of the catch-all to be more rigorous



##########
datafusion/common/src/scalar/mod.rs:
##########
@@ -2597,6 +2641,94 @@ impl ScalarValue {
                     _ => unreachable!("Invalid dictionary keys type: {}", 
key_type),
                 }
             }
+            DataType::RunEndEncoded(run_ends_field, value_field) => {

Review Comment:
   We're building the runarray efficiently here, as unlike dictionary above 
which would require keeping a hashmap of values to build an efficient 
dictionary array, run arrays are simpler in that we just need to track when a 
new run starts.
   
   Most of the verbosity here is related to destructuring input `ScalarValue`s 
and ensuring we have consistent types from them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to