This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c457d5c9 [GLUTEN-5599][VL] Support json_tuple (#5600)
1c457d5c9 is described below

commit 1c457d5c967f28e9a963d6c72439563b708b66c1
Author: WangGuangxin <[email protected]>
AuthorDate: Sat May 11 16:51:07 2024 +0800

    [GLUTEN-5599][VL] Support json_tuple (#5600)
---
 .../gluten/execution/GenerateExecTransformer.scala | 56 +++++++++++++++++-----
 .../org/apache/gluten/execution/TestOperator.scala | 22 +++++++++
 .../gluten/expression/ExpressionMappings.scala     |  2 +
 .../catalyst/expressions/JsonTupleExplode.scala    | 24 ++++++++++
 4 files changed, 92 insertions(+), 12 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
index 830fe396b..8f5782742 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
@@ -128,7 +128,7 @@ object GenerateExecTransformer {
       false
     } else {
       generator match {
-        case _: Inline | _: ExplodeBase =>
+        case _: Inline | _: ExplodeBase | _: JsonTuple =>
           true
         case _ =>
           false
@@ -138,9 +138,10 @@ object GenerateExecTransformer {
 }
 
 object PullOutGenerateProjectHelper extends PullOutProjectHelper {
+  val JSON_PATH_PREFIX = "$."
   def pullOutPreProject(generate: GenerateExec): SparkPlan = {
     if (GenerateExecTransformer.supportsGenerate(generate.generator, 
generate.outer)) {
-      val newGeneratorChildren = generate.generator match {
+      generate.generator match {
         case _: Inline | _: ExplodeBase =>
           val expressionMap = new mutable.HashMap[Expression, 
NamedExpression]()
           // The new child should be either the original Attribute,
@@ -156,20 +157,51 @@ object PullOutGenerateProjectHelper extends 
PullOutProjectHelper {
             // generator.child is other expression, e.g 
Literal/CreateArray/CreateMap
             expressionMap.values.head
           }
-          Seq(newGeneratorChild)
+          val newGeneratorChildren = Seq(newGeneratorChild)
+
+          // Avoid using elimainateProjectList to create the project list
+          // because newGeneratorChild can be a duplicated Attribute in 
generate.child.output.
+          // The native side identifies the last field of projection as 
generator's input.
+          generate.copy(
+            generator =
+              
generate.generator.withNewChildren(newGeneratorChildren).asInstanceOf[Generator],
+            child = ProjectExec(generate.child.output ++ newGeneratorChildren, 
generate.child)
+          )
+        case JsonTuple(Seq(jsonObj, jsonPaths @ _*)) =>
+          val getJsons: IndexedSeq[Expression] = {
+            jsonPaths.map {
+              case jsonPath if jsonPath.foldable =>
+                Option(jsonPath.eval()) match {
+                  case Some(path) =>
+                    GetJsonObject(jsonObj, Literal.create(JSON_PATH_PREFIX + 
path))
+                  case _ =>
+                    Literal.create(null)
+                }
+              case jsonPath =>
+                // TODO: The prefix is just for adapting to GetJsonObject.
+                // Maybe, we can remove this handling in the future by
+                // making path without "$." recognized
+                GetJsonObject(jsonObj, 
Concat(Seq(Literal.create(JSON_PATH_PREFIX), jsonPath)))
+            }.toIndexedSeq
+          }
+          val preGenerateExprs =
+            Alias(
+              CreateArray(Seq(CreateStruct(getJsons))),
+              generatePreAliasName
+            )()
+          // use JsonTupleExplode here instead of Explode so that we can 
distinguish
+          // JsonTuple and Explode, because JsonTuple has an extra 
post-projection
+          val newGenerator = JsonTupleExplode(preGenerateExprs.toAttribute)
+          generate.copy(
+            generator = newGenerator,
+            child = ProjectExec(generate.child.output ++ 
Seq(preGenerateExprs), generate.child)
+          )
         case _ =>
           // Unreachable.
           throw new IllegalStateException(
             s"Generator ${generate.generator.getClass.getSimpleName} is not 
supported.")
       }
-      // Avoid using elimainateProjectList to create the project list
-      // because newGeneratorChild can be a duplicated Attribute in 
generate.child.output.
-      // The native side identifies the last field of projection as 
generator's input.
-      generate.copy(
-        generator =
-          
generate.generator.withNewChildren(newGeneratorChildren).asInstanceOf[Generator],
-        child = ProjectExec(generate.child.output ++ newGeneratorChildren, 
generate.child)
-      )
+
     } else {
       generate
     }
@@ -191,7 +223,7 @@ object PullOutGenerateProjectHelper extends 
PullOutProjectHelper {
           ProjectExec(
             (generate.requiredChildOutput :+ ordinal) ++ 
generate.generatorOutput.tail,
             newGenerate)
-        case Inline(_) =>
+        case Inline(_) | JsonTupleExplode(_) =>
           val unnestOutput = {
             val struct = CreateStruct(generate.generatorOutput)
             val alias = Alias(struct, generatePostAliasName)()
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index b69223be1..82d008e1b 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -1407,4 +1407,26 @@ class TestOperator extends 
VeloxWholeStageTransformerSuite {
     // Verify there is not precision loss for timestamp columns after data 
broadcast.
     checkAnswer(df, expected)
   }
+
+  test("Test json_tuple function") {
+    withTempView("t") {
+      Seq[(String)](("{\"a\":\"b\"}"), (null), ("{\"b\":\"a\"}"))
+        .toDF("json_field")
+        .createOrReplaceTempView("t")
+      runQueryAndCompare(
+        "SELECT * from t lateral view json_tuple(json_field, 'a', 'b') as fa, 
fb") {
+        checkGlutenOperatorMatch[GenerateExecTransformer]
+      }
+    }
+
+    runQueryAndCompare(
+      """
+        |SELECT
+        | l_orderkey,
+        | json_tuple('{"a" : 1, "b" : 2}', CAST(NULL AS STRING), 'b', 
CAST(NULL AS STRING), 'a')
+        |from lineitem
+        |""".stripMargin) {
+      checkGlutenOperatorMatch[GenerateExecTransformer]
+    }
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
index 459a3d8e2..1592b0b9a 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
@@ -214,6 +214,8 @@ object ExpressionMappings {
     Sig[Sequence](SEQUENCE),
     Sig[CreateArray](CREATE_ARRAY),
     Sig[Explode](EXPLODE),
+    // JsonTupleExplode' behavior are the same with Explode
+    Sig[JsonTupleExplode](EXPLODE),
     Sig[Inline](INLINE),
     Sig[ArrayAggregate](AGGREGATE),
     Sig[LambdaFunction](LAMBDAFUNCTION),
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/catalyst/expressions/JsonTupleExplode.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/catalyst/expressions/JsonTupleExplode.scala
new file mode 100644
index 000000000..b93595cfb
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/catalyst/expressions/JsonTupleExplode.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+/** Used for transforming JsonTuple. The behavior is the same with Explode. */
+case class JsonTupleExplode(child: Expression) extends ExplodeBase {
+  override val position: Boolean = false
+  override protected def withNewChildInternal(newChild: Expression): 
JsonTupleExplode =
+    copy(child = newChild)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to