This is an automated email from the ASF dual-hosted git repository. comphead pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new eb4d4eb5d fix: support `map_values` (#1835) eb4d4eb5d is described below commit eb4d4eb5d784b533fe3d5fea5eaa640760886fd1 Author: Oleks V <comph...@users.noreply.github.com> AuthorDate: Tue Jun 3 16:13:24 2025 -0700 fix: support `map_values` (#1835) * fix: support `map_values` --- native/core/src/execution/planner.rs | 6 -- native/core/src/parquet/parquet_support.rs | 77 +++++++++++----------- .../src/array_funcs/get_array_struct_fields.rs | 44 ++++++++++--- .../org/apache/comet/serde/QueryPlanSerde.scala | 8 +-- .../apache/comet/exec/CometNativeReaderSuite.scala | 29 ++++---- 5 files changed, 90 insertions(+), 74 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 94e79e4e0..8ae8c9c02 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2954,14 +2954,8 @@ mod tests { })), }; - let a = Int32Array::from(vec![0, 3]); - let b = Int32Array::from(vec![1, 4]); - let c = Int32Array::from(vec![2, 5]); - let input_batch = InputBatch::Batch(vec![Arc::new(a), Arc::new(b), Arc::new(c)], 2); - let (mut scans, datafusion_plan) = planner.create_plan(&projection, &mut vec![], 1).unwrap(); - scans[0].set_input_batch(input_batch); let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap(); diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 4e6f8a172..c77d60135 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -255,53 +255,52 @@ fn cast_struct_to_struct( /// Cast a map type to another map type. The same as arrow-cast except we recursively call our own /// cast_array -pub(crate) fn cast_map_values( +fn cast_map_values( from: &MapArray, to_data_type: &DataType, parquet_options: &SparkParquetOptions, to_ordered: bool, ) -> Result<ArrayRef, DataFusionError> { - let entries_field = if let DataType::Map(entries_field, _) = to_data_type { - entries_field - } else { - return Err(DataFusionError::Internal( - "Internal Error: to_data_type is not a map type.".to_string(), - )); - }; + match to_data_type { + DataType::Map(entries_field, _) => { + let key_field = key_field(entries_field).ok_or(DataFusionError::Internal( + "map is missing key field".to_string(), + ))?; + let value_field = value_field(entries_field).ok_or(DataFusionError::Internal( + "map is missing value field".to_string(), + ))?; + + let key_array = cast_array( + Arc::<dyn Array>::clone(from.keys()), + key_field.data_type(), + parquet_options, + )?; + let value_array = cast_array( + Arc::<dyn Array>::clone(from.values()), + value_field.data_type(), + parquet_options, + )?; - let key_field = key_field(entries_field).ok_or(DataFusionError::Internal( - "map is missing key field".to_string(), - ))?; - let value_field = value_field(entries_field).ok_or(DataFusionError::Internal( - "map is missing value field".to_string(), - ))?; - - let key_array = cast_array( - Arc::<dyn Array>::clone(from.keys()), - key_field.data_type(), - parquet_options, - )?; - let value_array = cast_array( - Arc::<dyn Array>::clone(from.values()), - value_field.data_type(), - parquet_options, - )?; - - Ok(Arc::new(MapArray::new( - Arc::<arrow::datatypes::Field>::clone(entries_field), - from.offsets().clone(), - StructArray::new( - Fields::from(vec![key_field, value_field]), - vec![key_array, value_array], - from.entries().nulls().cloned(), - ), - from.nulls().cloned(), - to_ordered, - ))) + Ok(Arc::new(MapArray::new( + Arc::<arrow::datatypes::Field>::clone(entries_field), + from.offsets().clone(), + StructArray::new( + Fields::from(vec![key_field, value_field]), + vec![key_array, value_array], + from.entries().nulls().cloned(), + ), + from.nulls().cloned(), + to_ordered, + ))) + } + dt => Err(DataFusionError::Internal(format!( + "Expected MapType. Got: {dt}" + ))), + } } /// Gets the key field from the entries of a map. For all other types returns None. -pub(crate) fn key_field(entries_field: &FieldRef) -> Option<FieldRef> { +fn key_field(entries_field: &FieldRef) -> Option<FieldRef> { if let DataType::Struct(fields) = entries_field.data_type() { fields.first().cloned() } else { @@ -310,7 +309,7 @@ pub(crate) fn key_field(entries_field: &FieldRef) -> Option<FieldRef> { } /// Gets the value field from the entries of a map. For all other types returns None. -pub(crate) fn value_field(entries_field: &FieldRef) -> Option<FieldRef> { +fn value_field(entries_field: &FieldRef) -> Option<FieldRef> { if let DataType::Struct(fields) = entries_field.data_type() { fields.get(1).cloned() } else { diff --git a/native/spark-expr/src/array_funcs/get_array_struct_fields.rs b/native/spark-expr/src/array_funcs/get_array_struct_fields.rs index 990327c5d..6571a8081 100644 --- a/native/spark-expr/src/array_funcs/get_array_struct_fields.rs +++ b/native/spark-expr/src/array_funcs/get_array_struct_fields.rs @@ -15,7 +15,8 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{Array, GenericListArray, OffsetSizeTrait, StructArray}; +use arrow::array::{make_array, Array, GenericListArray, OffsetSizeTrait, StructArray}; +use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, FieldRef, Schema}; use arrow::record_batch::RecordBatch; use datafusion::common::{ @@ -80,10 +81,6 @@ impl PhysicalExpr for GetArrayStructFields { self } - fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result { - unimplemented!() - } - fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> { let struct_field = self.child_field(input_schema)?; match self.child.data_type(input_schema)? { @@ -138,6 +135,10 @@ impl PhysicalExpr for GetArrayStructFields { _ => internal_err!("GetArrayStructFields should have exactly one child"), } } + + fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result { + unimplemented!() + } } fn get_array_struct_fields<O: OffsetSizeTrait>( @@ -148,13 +149,36 @@ fn get_array_struct_fields<O: OffsetSizeTrait>( .values() .as_any() .downcast_ref::<StructArray>() - .expect("A struct is expected"); + .expect("A StructType is expected"); - let column = Arc::clone(values.column(ordinal)); let field = Arc::clone(&values.fields()[ordinal]); - - let offsets = list_array.offsets(); - let array = GenericListArray::new(field, offsets.clone(), column, list_array.nulls().cloned()); + // Get struct column by ordinal + let extracted_column = values.column(ordinal); + + let data = if values.null_count() == extracted_column.null_count() { + Arc::clone(extracted_column) + } else { + // In some cases the column obtained from struct by ordinal doesn't + // represent all nulls that imposed by parent values. + // This maybe caused by a low level reader bug and needs more investigation. + // For this specific case we patch the null buffer for the column by merging nulls buffers + // from parent and column + let merged_nulls = NullBuffer::union(values.nulls(), extracted_column.nulls()); + make_array( + extracted_column + .into_data() + .into_builder() + .nulls(merged_nulls) + .build()?, + ) + }; + + let array = GenericListArray::new( + field, + list_array.offsets().clone(), + data, + list_array.nulls().cloned(), + ); Ok(ColumnarValue::Array(Arc::new(array))) } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index f7060d8a1..3fedaa7e3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1975,11 +1975,9 @@ object QueryPlanSerde extends Logging with CometExprShim { case mk: MapKeys => val childExpr = exprToProtoInternal(mk.child, inputs, binding) scalarFunctionExprToProto("map_keys", childExpr) -// commented out because of correctness issue -// https://github.com/apache/datafusion-comet/issues/1789 -// case mv: MapValues => -// val childExpr = exprToProtoInternal(mv.child, inputs, binding) -// scalarFunctionExprToProto("map_values", childExpr) + case mv: MapValues => + val childExpr = exprToProtoInternal(mv.child, inputs, binding) + scalarFunctionExprToProto("map_values", childExpr) case _ => withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*) None diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index 3d725ee48..6d31d305d 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -332,18 +332,19 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper "select map_keys(c0).b from tbl") } -// commented out because of correctness issue https://github.com/apache/datafusion-comet/issues/1789 -// test("native reader - select nested field from a complex map[struct, struct] using map_values") { -// testSingleLineQuery( -// """ -// | select map(str0, str1) c0 from -// | ( -// | select named_struct('a', cast(1 as long), 'b', cast(2 as long), 'c', cast(3 as long)) str0, -// | named_struct('x', cast(8 as long), 'y', cast(9 as long), 'z', cast(0 as long)) str1 union all -// | select named_struct('a', cast(3 as long), 'b', cast(4 as long), 'c', cast(5 as long)) str0, -// | named_struct('x', cast(6 as long), 'y', cast(7 as long), 'z', cast(8 as long)) str1 -// | ) -// |""".stripMargin, -// "select map_values(c0).b from tbl") -// } + test( + "native reader - select nested field from a complex map[struct, struct] using map_values") { + testSingleLineQuery( + """ + | select map(str0, str1) c0 from + | ( + | select named_struct('a', cast(1 as long), 'b', cast(2 as long), 'c', cast(3 as long)) str0, + | named_struct('x', cast(8 as long), 'y', cast(9 as long), 'z', cast(0 as long)) str1 union all + | select named_struct('a', cast(3 as long), 'b', cast(4 as long), 'c', cast(5 as long)) str0, + | named_struct('x', cast(6 as long), 'y', cast(7 as long), 'z', cast(8 as long)) str1 union all + | select named_struct('a', cast(31 as long), 'b', cast(41 as long), 'c', cast(51 as long)), null + | ) + |""".stripMargin, + "select map_values(c0).y from tbl") + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org