This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d10ea88f feat: Add support to lookup map by key (#1898)
9d10ea88f is described below

commit 9d10ea88fc74c98a57791857ed5a8c580762fbe0
Author: Oleks V <comph...@users.noreply.github.com>
AuthorDate: Wed Jun 18 10:44:55 2025 -0700

    feat: Add support to lookup map by key (#1898)
    
    * feat: support lookup map by key
    
    * fmt
    
    * clippy
    
    * Update native/core/src/execution/planner.rs
    
    Co-authored-by: Andy Grove <agr...@apache.org>
    
    ---------
    
    Co-authored-by: Andy Grove <agr...@apache.org>
---
 native/core/src/execution/planner.rs               | 16 ++++++++-
 .../org/apache/comet/serde/QueryPlanSerde.scala    |  4 +++
 .../apache/comet/exec/CometNativeReaderSuite.scala | 42 ++++++++++++++++++++++
 3 files changed, 61 insertions(+), 1 deletion(-)

diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 8f725e661..2e2764af9 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -572,7 +572,21 @@ impl PhysicalPlanner {
                     fail_on_error,
                 )))
             }
-            ExprStruct::ScalarFunc(expr) => 
self.create_scalar_function_expr(expr, input_schema),
+            ExprStruct::ScalarFunc(expr) => {
+                let func = self.create_scalar_function_expr(expr, 
input_schema);
+                match expr.func.as_ref() {
+                    // DataFusion map_extract returns array of struct entries 
even if lookup by key
+                    // Apache Spark wants a single value, so wrap the result 
into additional list extraction
+                    "map_extract" => Ok(Arc::new(ListExtract::new(
+                        func?,
+                        Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
+                        None,
+                        true,
+                        false,
+                    ))),
+                    _ => func,
+                }
+            }
             ExprStruct::EqNullSafe(expr) => {
                 let left =
                     self.create_expr(expr.left.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 6b8740df0..229ff032a 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1970,6 +1970,10 @@ object QueryPlanSerde extends Logging with CometExprShim 
{
       case mv: MapValues =>
         val childExpr = exprToProtoInternal(mv.child, inputs, binding)
         scalarFunctionExprToProto("map_values", childExpr)
+      case gmv: GetMapValue =>
+        val mapExpr = exprToProtoInternal(gmv.child, inputs, binding)
+        val keyExpr = exprToProtoInternal(gmv.key, inputs, binding)
+        scalarFunctionExprToProto("map_extract", mapExpr, keyExpr)
       case _ =>
         withInfo(expr, s"${expr.prettyName} is not supported", expr.children: 
_*)
         None
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
index 5dbcdfee1..f33da3ba7 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
@@ -394,4 +394,46 @@ class CometNativeReaderSuite extends CometTestBase with 
AdaptiveSparkPlanHelper
       "select * from tbl",
       readSchema = Some(readSchema))
   }
+
+  test("native reader - extract map by key") {
+    // existing key
+    testSingleLineQuery(
+      """
+        | select map(str0, str1) c0 from
+        | (
+        |    select 'key0' str0, named_struct('a', 1, 'b', 'str') str1
+        | )
+        |""".stripMargin,
+      "select c0['key0'] from tbl")
+
+    // existing key, existing struct subfield
+    testSingleLineQuery(
+      """
+        | select map(str0, str1) c0 from
+        | (
+        |    select 'key0' str0, named_struct('a', 1, 'b', 'str') str1
+        | )
+        |""".stripMargin,
+      "select c0['key0'].b from tbl")
+
+    // nonexisting key
+    testSingleLineQuery(
+      """
+        | select map(str0, str1) c0 from
+        | (
+        |    select 'key0' str0, named_struct('a', 1, 'b', 'str') str1
+        | )
+        |""".stripMargin,
+      "select c0['key1'] from tbl")
+
+    // nonexisting key, existing struct subfield
+    testSingleLineQuery(
+      """
+        | select map(str0, str1) c0 from
+        | (
+        |    select 'key0' str0, named_struct('a', 1, 'b', 'str') str1
+        | )
+        |""".stripMargin,
+      "select c0['key1'].b from tbl")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to