This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1c457d5c9 [GLUTEN-5599][VL] Support json_tuple (#5600)
1c457d5c9 is described below
commit 1c457d5c967f28e9a963d6c72439563b708b66c1
Author: WangGuangxin <[email protected]>
AuthorDate: Sat May 11 16:51:07 2024 +0800
[GLUTEN-5599][VL] Support json_tuple (#5600)
---
.../gluten/execution/GenerateExecTransformer.scala | 56 +++++++++++++++++-----
.../org/apache/gluten/execution/TestOperator.scala | 22 +++++++++
.../gluten/expression/ExpressionMappings.scala | 2 +
.../catalyst/expressions/JsonTupleExplode.scala | 24 ++++++++++
4 files changed, 92 insertions(+), 12 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
index 830fe396b..8f5782742 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
@@ -128,7 +128,7 @@ object GenerateExecTransformer {
false
} else {
generator match {
- case _: Inline | _: ExplodeBase =>
+ case _: Inline | _: ExplodeBase | _: JsonTuple =>
true
case _ =>
false
@@ -138,9 +138,10 @@ object GenerateExecTransformer {
}
object PullOutGenerateProjectHelper extends PullOutProjectHelper {
+ val JSON_PATH_PREFIX = "$."
def pullOutPreProject(generate: GenerateExec): SparkPlan = {
if (GenerateExecTransformer.supportsGenerate(generate.generator,
generate.outer)) {
- val newGeneratorChildren = generate.generator match {
+ generate.generator match {
case _: Inline | _: ExplodeBase =>
val expressionMap = new mutable.HashMap[Expression,
NamedExpression]()
// The new child should be either the original Attribute,
@@ -156,20 +157,51 @@ object PullOutGenerateProjectHelper extends
PullOutProjectHelper {
// generator.child is other expression, e.g
Literal/CreateArray/CreateMap
expressionMap.values.head
}
- Seq(newGeneratorChild)
+ val newGeneratorChildren = Seq(newGeneratorChild)
+
+ // Avoid using elimainateProjectList to create the project list
+ // because newGeneratorChild can be a duplicated Attribute in
generate.child.output.
+ // The native side identifies the last field of projection as
generator's input.
+ generate.copy(
+ generator =
+
generate.generator.withNewChildren(newGeneratorChildren).asInstanceOf[Generator],
+ child = ProjectExec(generate.child.output ++ newGeneratorChildren,
generate.child)
+ )
+ case JsonTuple(Seq(jsonObj, jsonPaths @ _*)) =>
+ val getJsons: IndexedSeq[Expression] = {
+ jsonPaths.map {
+ case jsonPath if jsonPath.foldable =>
+ Option(jsonPath.eval()) match {
+ case Some(path) =>
+ GetJsonObject(jsonObj, Literal.create(JSON_PATH_PREFIX +
path))
+ case _ =>
+ Literal.create(null)
+ }
+ case jsonPath =>
+ // TODO: The prefix is just for adapting to GetJsonObject.
+ // Maybe, we can remove this handling in the future by
+ // making path without "$." recognized
+ GetJsonObject(jsonObj,
Concat(Seq(Literal.create(JSON_PATH_PREFIX), jsonPath)))
+ }.toIndexedSeq
+ }
+ val preGenerateExprs =
+ Alias(
+ CreateArray(Seq(CreateStruct(getJsons))),
+ generatePreAliasName
+ )()
+ // use JsonTupleExplode here instead of Explode so that we can
distinguish
+ // JsonTuple and Explode, because JsonTuple has an extra
post-projection
+ val newGenerator = JsonTupleExplode(preGenerateExprs.toAttribute)
+ generate.copy(
+ generator = newGenerator,
+ child = ProjectExec(generate.child.output ++
Seq(preGenerateExprs), generate.child)
+ )
case _ =>
// Unreachable.
throw new IllegalStateException(
s"Generator ${generate.generator.getClass.getSimpleName} is not
supported.")
}
- // Avoid using elimainateProjectList to create the project list
- // because newGeneratorChild can be a duplicated Attribute in
generate.child.output.
- // The native side identifies the last field of projection as
generator's input.
- generate.copy(
- generator =
-
generate.generator.withNewChildren(newGeneratorChildren).asInstanceOf[Generator],
- child = ProjectExec(generate.child.output ++ newGeneratorChildren,
generate.child)
- )
+
} else {
generate
}
@@ -191,7 +223,7 @@ object PullOutGenerateProjectHelper extends
PullOutProjectHelper {
ProjectExec(
(generate.requiredChildOutput :+ ordinal) ++
generate.generatorOutput.tail,
newGenerate)
- case Inline(_) =>
+ case Inline(_) | JsonTupleExplode(_) =>
val unnestOutput = {
val struct = CreateStruct(generate.generatorOutput)
val alias = Alias(struct, generatePostAliasName)()
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index b69223be1..82d008e1b 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -1407,4 +1407,26 @@ class TestOperator extends
VeloxWholeStageTransformerSuite {
// Verify there is not precision loss for timestamp columns after data
broadcast.
checkAnswer(df, expected)
}
+
+ test("Test json_tuple function") {
+ withTempView("t") {
+ Seq[(String)](("{\"a\":\"b\"}"), (null), ("{\"b\":\"a\"}"))
+ .toDF("json_field")
+ .createOrReplaceTempView("t")
+ runQueryAndCompare(
+ "SELECT * from t lateral view json_tuple(json_field, 'a', 'b') as fa,
fb") {
+ checkGlutenOperatorMatch[GenerateExecTransformer]
+ }
+ }
+
+ runQueryAndCompare(
+ """
+ |SELECT
+ | l_orderkey,
+ | json_tuple('{"a" : 1, "b" : 2}', CAST(NULL AS STRING), 'b',
CAST(NULL AS STRING), 'a')
+ |from lineitem
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[GenerateExecTransformer]
+ }
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
index 459a3d8e2..1592b0b9a 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
@@ -214,6 +214,8 @@ object ExpressionMappings {
Sig[Sequence](SEQUENCE),
Sig[CreateArray](CREATE_ARRAY),
Sig[Explode](EXPLODE),
+ // JsonTupleExplode' behavior are the same with Explode
+ Sig[JsonTupleExplode](EXPLODE),
Sig[Inline](INLINE),
Sig[ArrayAggregate](AGGREGATE),
Sig[LambdaFunction](LAMBDAFUNCTION),
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/catalyst/expressions/JsonTupleExplode.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/catalyst/expressions/JsonTupleExplode.scala
new file mode 100644
index 000000000..b93595cfb
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/catalyst/expressions/JsonTupleExplode.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+/** Used for transforming JsonTuple. The behavior is the same with Explode. */
+case class JsonTupleExplode(child: Expression) extends ExplodeBase {
+ override val position: Boolean = false
+ override protected def withNewChildInternal(newChild: Expression):
JsonTupleExplode =
+ copy(child = newChild)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]