This is an automated email from the ASF dual-hosted git repository.
lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 6ffe05d9be replace from_json with get_json_object (#8409)
6ffe05d9be is described below
commit 6ffe05d9bea0da75cab16494dec1e39e92846e9e
Author: lgbo <[email protected]>
AuthorDate: Tue Jan 21 09:51:20 2025 +0800
replace from_json with get_json_object (#8409)
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 7 ++
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 1 +
.../extension/BasicExpressionRewriteRule.scala | 77 ++++++++++++++++++++++
.../execution/GlutenFunctionValidateSuite.scala | 21 ++++++
4 files changed, 106 insertions(+)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index a47aab55fe..e883f8c454 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -395,6 +395,13 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
)
}
+ def enableReplaceFromJsonWithGetJsonObject(): Boolean = {
+ SparkEnv.get.conf.getBoolean(
+ CHConf.runtimeConfig("enable_replace_from_json_with_get_json_object"),
+ defaultValue = true
+ )
+ }
+
override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.get.enableNativeWriter.getOrElse(false)
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index c79931fa4e..40344e96e7 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -63,6 +63,7 @@ object CHRuleApi {
injector.injectResolutionRule(spark => new
RewriteToDateExpresstionRule(spark))
injector.injectResolutionRule(spark => new
RewriteDateTimestampComparisonRule(spark))
injector.injectResolutionRule(spark => new
CollapseGetJsonObjectExpressionRule(spark))
+ injector.injectResolutionRule(spark => new
RepalceFromJsonWithGetJsonObject(spark))
injector.injectOptimizerRule(spark => new
CommonSubexpressionEliminateRule(spark))
injector.injectOptimizerRule(spark => new ExtendedColumnPruning(spark))
injector.injectOptimizerRule(spark =>
CHAggregateFunctionRewriteRule(spark))
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/BasicExpressionRewriteRule.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/BasicExpressionRewriteRule.scala
new file mode 100644
index 0000000000..5943582b1e
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/BasicExpressionRewriteRule.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/*
+ * This file includes some rules to repace expressions in more efficient way.
+ */
+
+// Try to replace `from_json` with `get_json_object` if possible.
+class RepalceFromJsonWithGetJsonObject(spark: SparkSession) extends
Rule[LogicalPlan] with Logging {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ if (!CHBackendSettings.enableReplaceFromJsonWithGetJsonObject ||
!plan.resolved) {
+ plan
+ } else {
+ visitPlan(plan)
+ }
+ }
+
+ def visitPlan(plan: LogicalPlan): LogicalPlan = {
+ val newPlan = plan match {
+ case project: Project =>
+ val newProjectList =
+ project.projectList.map(expr =>
visitExpression(expr).asInstanceOf[NamedExpression])
+ project.copy(projectList = newProjectList, child =
visitPlan(project.child))
+ case filter: Filter =>
+ val newCondition = visitExpression(filter.condition)
+ Filter(newCondition, visitPlan(filter.child))
+ case other =>
+ other.withNewChildren(other.children.map(visitPlan))
+ }
+ // Some plan nodes have tags, we need to copy the tags to the new ones.
+ newPlan.copyTagsFrom(plan)
+ newPlan
+ }
+
+ def visitExpression(expr: Expression): Expression = {
+ expr match {
+ case getMapValue: GetMapValue
+ if getMapValue.child.isInstanceOf[JsonToStructs] &&
+ getMapValue.child.dataType.isInstanceOf[MapType] &&
+
getMapValue.child.dataType.asInstanceOf[MapType].valueType.isInstanceOf[StringType]
&&
+ getMapValue.key.isInstanceOf[Literal] &&
+ getMapValue.key.dataType.isInstanceOf[StringType] =>
+ val child =
visitExpression(getMapValue.child.asInstanceOf[JsonToStructs].child)
+ val key =
UTF8String.fromString(s"$$.${getMapValue.key.asInstanceOf[Literal].value}")
+ GetJsonObject(child, Literal(key, StringType))
+ case literal: Literal => literal
+ case attr: Attribute => attr
+ case other =>
+ other.withNewChildren(other.children.map(visitExpression))
+ }
+ }
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
index f84557e6e9..84c92d1e04 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
@@ -995,4 +995,25 @@ class GlutenFunctionValidateSuite extends
GlutenClickHouseWholeStageTransformerS
}
compareResultsAgainstVanillaSpark(sql, true, checkProjects, false)
}
+
+ test("GLUTEN-8406 replace from_json with get_json_object") {
+ withTable("test_8406") {
+ spark.sql("create table test_8406(x string) using parquet")
+ val insert_sql =
+ """
+ |insert into test_8406 values
+ |('{"a":1}'),
+ |('{"a":2'),
+ |('{"b":3}'),
+ |('{"a":"5"}'),
+ |('{"a":{"x":1}}')
+ |""".stripMargin
+ spark.sql(insert_sql)
+ val sql =
+ """
+ |select from_json(x, 'Map<String, String>')['a'] from test_8406
+ |""".stripMargin
+ compareResultsAgainstVanillaSpark(sql, true, { _ => })
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]