Weijun-H commented on code in PR #11969:
URL: https://github.com/apache/datafusion/pull/11969#discussion_r1718205611


##########
datafusion/functions-nested/src/map_extract.rs:
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ScalarUDFImpl`] definitions for map_extract functions.
+
+use arrow::array::{ArrayRef, Capacities, MutableArrayData};
+use arrow_array::make_array;
+
+use arrow::datatypes::{DataType, Float64Type, Int64Type, UInt64Type};
+use arrow_array::{Array, MapArray, PrimitiveArray, StringArray, 
StringViewArray};
+use datafusion_common::utils::get_map_entry_field;
+use datafusion_common::DataFusionError;
+use datafusion_common::{cast::as_map_array, exec_err, Result};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+use std::any::Any;
+
+use crate::utils::make_scalar_function;
+
+// Create static instances of ScalarUDFs for each function
+make_udf_expr_and_func!(
+    MapExtract,
+    map_extract,
+    map key,
+    "Return corresponding values from a map for a given key,  or NULL if the 
key is not found.",
+    map_extract_udf
+);
+
+#[derive(Debug)]
+pub(super) struct MapExtract {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl MapExtract {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::user_defined(Volatility::Immutable),
+            aliases: vec![String::from("element_at")],
+        }
+    }
+}
+
+impl ScalarUDFImpl for MapExtract {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+    fn name(&self) -> &str {
+        "map_extract"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if arg_types.len() != 2 {
+            return exec_err!("map_extract expects two arguments");
+        }
+        let map_type = &arg_types[0];
+        let map_fields = get_map_entry_field(map_type)?;
+        Ok(map_fields[1].data_type().clone())
+    }
+
+    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        make_scalar_function(map_extract_inner)(args)
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
+        if arg_types.len() != 2 {
+            return exec_err!("map_extract expects two arguments");
+        }
+
+        let field = get_map_entry_field(&arg_types[0])?;
+        Ok(vec![
+            arg_types[0].clone(),
+            field.first().unwrap().data_type().clone(),
+        ])
+    }
+}
+
+macro_rules! impl_map_extract_inner {
+    ($func_name:ident, $key_type:ty, $array_type:ty) => {
+        fn $func_name(
+            map_array: &MapArray,
+            query_keys_array: &dyn Array,
+        ) -> Result<ArrayRef> {
+            let keys_array = map_array
+                .keys()
+                .as_any()
+                .downcast_ref::<$array_type>()
+                .ok_or_else(|| {
+                    DataFusionError::Internal(format!(
+                        "Failed to downcast keys array to {}",
+                        stringify!($array_type)
+                    ))
+                })?;
+            let query_keys_array = query_keys_array
+                .as_any()
+                .downcast_ref::<$array_type>()
+                .ok_or_else(|| {
+                    DataFusionError::Internal(format!(
+                        "Failed to downcast query keys array to {}",
+                        stringify!($array_type)
+                    ))
+                })?;
+
+            let values = map_array.values();
+            let original_data = values.to_data();
+            let capacity = Capacities::Array(original_data.len());
+
+            let mut offsets = vec![0_i32];
+
+            let mut mutable =
+                MutableArrayData::with_capacities(vec![&original_data], true, 
capacity);
+
+            for (row_index, offset_window) in
+                map_array.value_offsets().windows(2).enumerate()
+            {
+                let start = offset_window[0] as usize;
+                let end = offset_window[1] as usize;
+                let len = end - start;
+
+                let query_key = query_keys_array.value(row_index);
+
+                let value_index = keys_array
+                    .slice(start, len)
+                    .iter()
+                    .position(|key| key.unwrap() == query_key);
+                match value_index {
+                    Some(index) => {
+                        let new_index = start + index;
+                        mutable.extend(0, new_index, new_index + 1);
+                        offsets.push(offsets.last().unwrap().to_owned() + 1);
+                    }
+                    None => {
+                        offsets.push(offsets.last().unwrap().to_owned());
+                        mutable.extend_nulls(1);
+                    }
+                }
+            }
+
+            let data = mutable.freeze();
+            Ok(make_array(data))
+        }
+    };
+}
+
+// Implement for different types
+impl_map_extract_inner!(
+    generic_map_extract_inner_int,
+    i64,
+    PrimitiveArray<Int64Type>
+);
+impl_map_extract_inner!(
+    generic_map_extract_inner_uint,
+    u64,
+    PrimitiveArray<UInt64Type>
+);
+impl_map_extract_inner!(
+    generic_map_extract_inner_float,
+    f64,
+    PrimitiveArray<Float64Type>
+);
+impl_map_extract_inner!(string_map_extract_inner, &str, StringArray);
+impl_map_extract_inner!(string_view_map_extract_inner, &str, StringViewArray);

Review Comment:
   use macro here to avoid code duplication



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to