This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f9b9dbadfb [VL] Enable to_json function (#9357)
f9b9dbadfb is described below
commit f9b9dbadfbb65296ea3fc64da5de8b0a99d17896
Author: Wechar Yu <[email protected]>
AuthorDate: Tue Aug 12 16:13:43 2025 +0800
[VL] Enable to_json function (#9357)
Enable to_json function after velox supports facebookincubator/velox#11995.
---
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 11 +++++++++++
.../functions/JsonFunctionsValidateSuite.scala | 22 ++++++++++++++++++++++
cpp/core/config/GlutenConfig.h | 2 ++
cpp/velox/compute/WholeStageResultIterator.cc | 3 +++
cpp/velox/substrait/SubstraitToVeloxExpr.cc | 3 ++-
.../substrait/SubstraitToVeloxPlanValidator.cc | 2 +-
.../substrait/expression/StructLiteralNode.java | 4 +++-
.../apache/gluten/substrait/type/StructNode.java | 4 ++++
.../substrait/proto/substrait/algebra.proto | 1 +
.../gluten/backendsapi/SparkPlanExecApi.scala | 7 +++++++
.../org/apache/gluten/config/GlutenConfig.scala | 1 +
.../gluten/expression/ExpressionConverter.scala | 6 ++++++
...ormer.scala => JsonExpressionTransformer.scala} | 13 ++++++++++++-
13 files changed, 75 insertions(+), 4 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index bcd759566f..12e2c80607 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -794,6 +794,17 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
GenericExpressionTransformer(substraitExprName, children, expr)
}
+ /** Generate an expression transformer to transform StructsToJson to
Substrait. */
+ override def genToJsonTransformer(
+ substraitExprName: String,
+ child: ExpressionTransformer,
+ expr: StructsToJson): ExpressionTransformer = {
+ if (!expr.options.isEmpty) {
+ throw new GlutenNotSupportException("'to_json' with options is not
supported in Velox")
+ }
+ ToJsonTransformer(substraitExprName, child, expr)
+ }
+
/** Generate an expression transformer to transform NamedStruct to
Substrait. */
override def genNamedStructTransformer(
substraitExprName: String,
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/functions/JsonFunctionsValidateSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/functions/JsonFunctionsValidateSuite.scala
index b4021e9d1e..fd54502e45 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/functions/JsonFunctionsValidateSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/functions/JsonFunctionsValidateSuite.scala
@@ -377,4 +377,26 @@ class JsonFunctionsValidateSuite extends
FunctionsValidateSuite {
}
}
}
+
+ test("to_json function") {
+ withTable("t") {
+ spark.sql("create table t (a int, b string, c array<int>, d map<int,
string>) using parquet")
+ spark.sql("""insert into t values (1, 'str', array(1,2,3), map(1, 'v')),
+ |(2, 'str2', array(), map(1, 'v1', 2, 'v2')),
+ |(3, '', array(1), map())
+ |""".stripMargin)
+
+ runQueryAndCompare("select to_json(named_struct('a', a, 'b', b, 'c', c,
'd', d)) from t") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+
+ runQueryAndCompare("select to_json(c) from t") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+
+ runQueryAndCompare("select to_json(d) from t") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+ }
+ }
}
diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 98d992a354..fe24260306 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -90,6 +90,8 @@ const std::string kSparkMapKeyDedupPolicy =
"spark.sql.mapKeyDedupPolicy";
const std::string kSparkLegacyStatisticalAggregate =
"spark.sql.legacy.statisticalAggregate";
+const std::string kSparkJsonIgnoreNullFields =
"spark.sql.jsonGenerator.ignoreNullFields";
+
// cudf
#ifdef GLUTEN_ENABLE_GPU
const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf";
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index b83ab7f120..841aaac232 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -584,6 +584,9 @@ std::unordered_map<std::string, std::string>
WholeStageResultIterator::getQueryC
configs[velox::core::QueryConfig::kSparkLegacyStatisticalAggregate] =
std::to_string(veloxCfg_->get<bool>(kSparkLegacyStatisticalAggregate,
false));
+ configs[velox::core::QueryConfig::kSparkJsonIgnoreNullFields] =
+ std::to_string(veloxCfg_->get<bool>(kSparkJsonIgnoreNullFields, true));
+
#ifdef GLUTEN_ENABLE_GPU
if (veloxCfg_->get<bool>(kCudfEnabled, false)) {
// TODO: wait for PR
https://github.com/facebookincubator/velox/pull/13341
diff --git a/cpp/velox/substrait/SubstraitToVeloxExpr.cc
b/cpp/velox/substrait/SubstraitToVeloxExpr.cc
index 5d38aa7961..fdee942eaa 100755
--- a/cpp/velox/substrait/SubstraitToVeloxExpr.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxExpr.cc
@@ -515,8 +515,9 @@ RowVectorPtr
SubstraitVeloxExprConverter::literalsToRowVector(const ::substrait:
vectors.reserve(numFields);
names.reserve(numFields);
for (auto i = 0; i < numFields; ++i) {
- names.push_back("col_" + std::to_string(i));
const auto& child = structLiteral.struct_().fields(i);
+ const auto& name = structLiteral.struct_().names(i);
+ names.push_back(name);
auto typeCase = child.literal_type_case();
switch (typeCase) {
case
::substrait::Expression_Literal::LiteralTypeCase::kIntervalDayToSecond: {
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 37f2d96385..ebc0a0675b 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -59,7 +59,7 @@ const std::unordered_set<std::string> kRegexFunctions = {
"rlike"};
const std::unordered_set<std::string> kBlackList =
- {"split_part", "sequence", "approx_percentile", "get_array_struct_fields",
"map_from_arrays", "to_json"};
+ {"split_part", "sequence", "approx_percentile", "get_array_struct_fields",
"map_from_arrays"};
} // namespace
bool SubstraitToVeloxPlanValidator::parseVeloxType(
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/expression/StructLiteralNode.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/expression/StructLiteralNode.java
index 0912b53d2c..981fa36f4c 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/expression/StructLiteralNode.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/expression/StructLiteralNode.java
@@ -93,8 +93,10 @@ public class StructLiteralNode extends
LiteralNodeWithValue<InternalRow> {
@Override
protected void updateLiteralBuilder(Builder literalBuilder, InternalRow row)
{
Expression.Literal.Struct.Builder structBuilder =
Expression.Literal.Struct.newBuilder();
- for (int i = 0; i < ((StructNode) getTypeNode()).getFieldTypes().size();
++i) {
+ StructNode structNode = (StructNode) getTypeNode();
+ for (int i = 0; i < structNode.getFieldTypes().size(); ++i) {
structBuilder.addFields(getFieldLiteral(i).getLiteral());
+ structBuilder.addNames(structNode.getNames().get(i));
}
literalBuilder.setStruct(structBuilder.build());
}
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/StructNode.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/StructNode.java
index c9d69e9463..57deda94cc 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/StructNode.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/StructNode.java
@@ -42,6 +42,10 @@ public class StructNode implements TypeNode, Serializable {
return types;
}
+ public List<String> getNames() {
+ return names;
+ }
+
@Override
public Type toProtobuf() {
Type.Struct.Builder structBuilder = Type.Struct.newBuilder();
diff --git
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
index 2d64726057..176a4e4c25 100644
---
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
+++
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -875,6 +875,7 @@ message Expression {
message Struct {
// A possibly heterogeneously typed list of literals
repeated Literal fields = 1;
+ repeated string names = 2;
}
message List {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 404d61940b..f4ebbed5bd 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -168,6 +168,13 @@ trait SparkPlanExecApi {
GenericExpressionTransformer(substraitExprName, children, expr)
}
+ def genToJsonTransformer(
+ substraitExprName: String,
+ child: ExpressionTransformer,
+ expr: StructsToJson): ExpressionTransformer = {
+ GenericExpressionTransformer(substraitExprName, child, expr)
+ }
+
/** Transform GetArrayItem to Substrait. */
def genGetArrayItemTransformer(
substraitExprName: String,
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 913de09981..60e469f055 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -464,6 +464,7 @@ object GlutenConfig {
SHUFFLE_WRITER_BUFFER_SIZE.key,
SQLConf.LEGACY_SIZE_OF_NULL.key,
SQLConf.LEGACY_STATISTICAL_AGGREGATE.key,
+ SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key,
"spark.io.compression.codec",
"spark.sql.decimalOperations.allowPrecisionLoss",
"spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems",
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index c53b11e9d7..a335a17c37 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -771,6 +771,12 @@ object ExpressionConverter extends SQLConfHelper with
Logging {
substraitExprName,
expr.children.map(replaceWithExpressionTransformer0(_, attributeSeq,
expressionsMap)),
j)
+ case s: StructsToJson =>
+ BackendsApiManager.getSparkPlanExecApiInstance.genToJsonTransformer(
+ substraitExprName,
+ replaceWithExpressionTransformer0(s.child, attributeSeq,
expressionsMap),
+ s
+ )
case u: UnBase64 if
SparkShimLoader.getSparkShims.unBase64FunctionFailsOnError(u) =>
throw new GlutenNotSupportException("UnBase64 with failOnError is not
supported in gluten.")
case ce if
BackendsApiManager.getSparkPlanExecApiInstance.expressionFlattenSupported(ce) =>
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/JsonTupleExpressionTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/JsonExpressionTransformer.scala
similarity index 84%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/expression/JsonTupleExpressionTransformer.scala
rename to
gluten-substrait/src/main/scala/org/apache/gluten/expression/JsonExpressionTransformer.scala
index c5978f714b..fe08a57c43 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/JsonTupleExpressionTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/JsonExpressionTransformer.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.substrait.`type`.ListNode
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode}
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{Expression, StructsToJson}
import com.google.common.collect.Lists
@@ -53,3 +53,14 @@ case class JsonTupleExpressionTransformer(
}
}
}
+
+case class ToJsonTransformer(
+ substraitExprName: String,
+ child: ExpressionTransformer,
+ original: StructsToJson)
+ extends ExpressionTransformer {
+ override def children: Seq[ExpressionTransformer] = {
+ val timeZoneId = original.timeZoneId.map(timeZoneId =>
LiteralTransformer(timeZoneId))
+ Seq(child) ++ timeZoneId
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]