This is an automated email from the ASF dual-hosted git repository.
andygrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 3ee5d65fab feat: 100% Spark-compatible JSON support via codegen
dispatcher (#4305)
3ee5d65fab is described below
commit 3ee5d65fab757807824bc74236df7199d4e4566c
Author: Andy Grove <[email protected]>
AuthorDate: Thu Jun 4 22:09:07 2026 -0600
feat: 100% Spark-compatible JSON support via codegen dispatcher (#4305)
---
.github/workflows/pr_build_linux.yml | 1 +
.github/workflows/pr_build_macos.yml | 1 +
.../expression-audits/json_funcs.md | 6 ++
.../user-guide/latest/compatibility/index.md | 1 +
.../source/user-guide/latest/compatibility/json.md | 56 ++++++++++
docs/source/user-guide/latest/expressions.md | 2 +-
docs/source/user-guide/latest/index.rst | 1 +
.../comet/codegen/CometBatchKernelCodegen.scala | 28 +++--
.../org/apache/comet/serde/CometScalaUDF.scala | 16 ++-
.../main/scala/org/apache/comet/serde/json.scala | 34 ++++--
.../scala/org/apache/comet/serde/strings.scala | 43 ++++----
.../scala/org/apache/comet/serde/structs.scala | 114 ++++++++++-----------
...k_reasons.sql => json_array_length_default.sql} | 18 +++-
.../expressions/string/get_json_object.sql | 2 +
.../expressions/struct/structs_to_json.sql | 77 ++++++++++++--
.../org/apache/comet/CometCodegenSourceSuite.scala | 11 +-
.../apache/comet/CometJsonExpressionSuite.scala | 16 +--
.../scala/org/apache/comet/CometJsonJvmSuite.scala | 70 +++++++++++++
18 files changed, 364 insertions(+), 133 deletions(-)
diff --git a/.github/workflows/pr_build_linux.yml
b/.github/workflows/pr_build_linux.yml
index 68025fd7ba..1cf91828c1 100644
--- a/.github/workflows/pr_build_linux.yml
+++ b/.github/workflows/pr_build_linux.yml
@@ -377,6 +377,7 @@ jobs:
org.apache.comet.CometMapExpressionSuite
org.apache.comet.CometCsvExpressionSuite
org.apache.comet.CometJsonExpressionSuite
+ org.apache.comet.CometJsonJvmSuite
org.apache.comet.SparkErrorConverterSuite
org.apache.comet.expressions.conditional.CometIfSuite
org.apache.comet.expressions.conditional.CometCoalesceSuite
diff --git a/.github/workflows/pr_build_macos.yml
b/.github/workflows/pr_build_macos.yml
index d0a03eeb75..41285dc8fd 100644
--- a/.github/workflows/pr_build_macos.yml
+++ b/.github/workflows/pr_build_macos.yml
@@ -193,6 +193,7 @@ jobs:
org.apache.comet.CometMapExpressionSuite
org.apache.comet.CometCsvExpressionSuite
org.apache.comet.CometJsonExpressionSuite
+ org.apache.comet.CometJsonJvmSuite
org.apache.comet.SparkErrorConverterSuite
org.apache.comet.expressions.conditional.CometIfSuite
org.apache.comet.expressions.conditional.CometCoalesceSuite
diff --git a/docs/source/contributor-guide/expression-audits/json_funcs.md
b/docs/source/contributor-guide/expression-audits/json_funcs.md
index 4184ceac22..689142e2a3 100644
--- a/docs/source/contributor-guide/expression-audits/json_funcs.md
+++ b/docs/source/contributor-guide/expression-audits/json_funcs.md
@@ -33,6 +33,12 @@
- Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1.
- Known incompatibility: Spark accepts single-quoted JSON and unescaped
control characters; Comet's native parser (built on `serde_json`) rejects both,
so those inputs require
`spark.comet.expression.GetJsonObject.allowIncompatible=true` and may still
produce different results. Non-default Spark 4.0 string collations are not
propagated (https://github.com/apache/datafusion-comet/issues/2190).
+## json_array_length
+
+- `LengthOfJsonArray`: `UnaryExpression with ExpectsInputTypes with
CodegenFallback`; `inputTypes = Seq(StringType) -> IntegerType`. Returns NULL
for NULL input, invalid JSON, or non-array JSON; otherwise the number of
top-level array elements.
+- Runs through the codegen dispatcher by default for byte-exact Spark
compatibility.
+- Known incompatibility: the native path (built on `serde_json`) requires
strict JSON, so single-quoted JSON, unescaped control characters, and trailing
content require
`spark.comet.expression.LengthOfJsonArray.allowIncompatible=true` and may still
produce different results.
+
## to_json
- Partial native support; options and map/array inputs fall back.
diff --git a/docs/source/user-guide/latest/compatibility/index.md
b/docs/source/user-guide/latest/compatibility/index.md
index 542d6902ec..46578d1683 100644
--- a/docs/source/user-guide/latest/compatibility/index.md
+++ b/docs/source/user-guide/latest/compatibility/index.md
@@ -28,4 +28,5 @@ This guide documents areas where Comet's behavior is known to
differ from Spark.
- **Regular expressions**: differences between the Rust regexp crate and
Java's regex engine.
- **Operators**: operator-level compatibility notes, including window
functions and round-robin partitioning.
- **Expressions**: per-expression compatibility notes, including cast.
+- **JSON**: choosing between the native and Spark-compatible engines for JSON
expressions.
- **Spark versions**: version-specific known issues and limitations.
diff --git a/docs/source/user-guide/latest/compatibility/json.md
b/docs/source/user-guide/latest/compatibility/json.md
new file mode 100644
index 0000000000..2dc79c27a8
--- /dev/null
+++ b/docs/source/user-guide/latest/compatibility/json.md
@@ -0,0 +1,56 @@
+<!---
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# JSON Compatibility
+
+Comet can evaluate JSON expressions (`get_json_object`, `from_json`, `to_json`,
+`json_array_length`) two ways:
+
+- **Codegen dispatcher (default):** Spark's own `doGenCode` for the expression
+ runs inside the Comet pipeline (via Comet's Arrow-direct codegen dispatcher),
+ giving byte-exact compatibility with Spark at the cost of a JNI roundtrip per
+ batch. This rides the codegen dispatcher
+ (`spark.comet.exec.scalaUDF.codegen.enabled`, enabled by default); if the
+ dispatcher is disabled, the operator falls back to Spark.
+- **Native (rust) path:** the native DataFusion implementation. Faster, but has
+ known compatibility gaps with Spark on certain inputs, so it is **opt-in per
+ expression** via the expression's `allowIncompatible` config. Any expression
or
+ input case with no native implementation falls back to the codegen
dispatcher.
+
+## Expression coverage
+
+| SQL | Native (rust) path
| Opt-in config
|
+| ------------------- |
----------------------------------------------------------------------------------------------
| ------------------------------------------------------------ |
+| `get_json_object` | Supported, with gaps on single-quoted JSON and
unescaped control characters |
`spark.comet.expression.GetJsonObject.allowIncompatible` |
+| `from_json` | Supported with restrictions (PERMISSIVE mode only,
simple schema types only) |
`spark.comet.expression.JsonToStructs.allowIncompatible` |
+| `to_json` | Supported for struct inputs only, no options
|
`spark.comet.expression.StructsToJson.allowIncompatible` |
+| `json_array_length` | Supported, with gaps on single-quoted JSON, unescaped
control characters, and trailing content |
`spark.comet.expression.LengthOfJsonArray.allowIncompatible` |
+
+When the native path is enabled but an expression or input case has no native
+implementation (for example `to_json` with map or array inputs, or `from_json`
+with an unsupported schema), Comet falls back to the codegen dispatcher for
that
+case.
+
+## When to use the native path
+
+- You want the faster native path and your inputs avoid the known compatibility
+ gaps above.
+- Enable it per expression, for example
+ `spark.comet.expression.GetJsonObject.allowIncompatible=true`. Cases the
native path
+ does not cover still fall back to the codegen dispatcher.
diff --git a/docs/source/user-guide/latest/expressions.md
b/docs/source/user-guide/latest/expressions.md
index ed190bebb9..79093ddb9a 100644
--- a/docs/source/user-guide/latest/expressions.md
+++ b/docs/source/user-guide/latest/expressions.md
@@ -338,7 +338,7 @@ expression-level). The `outer` variants are wired but
marked `Incompatible`; the
| --- | --- | --- |
| `from_json` | β
| Falls back by default; opt-in via allowIncompatible
([audit](../../contributor-guide/expression-audits/json_funcs.md#from_json)) |
| `get_json_object` | β
| Some inputs need allowIncompatible
([audit](../../contributor-guide/expression-audits/json_funcs.md#get_json_object))
|
-| `json_array_length` | π | tracking
[#4098](https://github.com/apache/datafusion-comet/issues/4098) |
+| `json_array_length` | β
| Single-quoted/trailing JSON needs
allowIncompatible
([audit](../../contributor-guide/expression-audits/json_funcs.md#json_array_length))
|
| `json_object_keys` | π |
[#3161](https://github.com/apache/datafusion-comet/issues/3161) |
| `json_tuple` | π |
[#3160](https://github.com/apache/datafusion-comet/issues/3160) |
| `schema_of_json` | π |
[#3163](https://github.com/apache/datafusion-comet/issues/3163) |
diff --git a/docs/source/user-guide/latest/index.rst
b/docs/source/user-guide/latest/index.rst
index 00f770c27e..621fb9d73e 100644
--- a/docs/source/user-guide/latest/index.rst
+++ b/docs/source/user-guide/latest/index.rst
@@ -62,6 +62,7 @@ to read more.
compatibility/regex
compatibility/operators
compatibility/expressions/index
+ compatibility/json
compatibility/spark-versions
.. toctree::
diff --git
a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala
b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala
index 25bd70898e..1c990835bb 100644
---
a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala
+++
b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala
@@ -23,7 +23,7 @@ import org.apache.arrow.vector._
import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector}
import org.apache.arrow.vector.types.pojo.Field
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression,
HigherOrderFunction, LambdaFunction, Literal, NamedLambdaVariable, Unevaluable}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression,
Literal, Unevaluable}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -107,9 +107,8 @@ object CometBatchKernelCodegen extends Logging with
CometExprTraitShim {
* back cleanly rather than crashing the Janino compile at execute time.
*
* Checks every `BoundReference`'s data type and the root `expr.dataType`
against
- * [[isSupportedDataType]], rejects aggregates / generators /
`CodegenFallback` (other than
- * HOFs, which are admitted), and gates total nested-field count on
- * `spark.sql.codegen.maxFields`.
+ * [[isSupportedDataType]], rejects aggregates / generators / `Unevaluable`,
and gates total
+ * nested-field count on `spark.sql.codegen.maxFields`.
*/
def canHandle(boundExpr: Expression): Option[String] = {
if (!isSupportedDataType(boundExpr.dataType)) {
@@ -127,12 +126,15 @@ object CometBatchKernelCodegen extends Logging with
CometExprTraitShim {
s"codegen dispatch: too many nested fields ($totalFields > " +
s"spark.sql.codegen.maxFields=$maxFields)")
}
- // HOFs are `CodegenFallback` but admitted: `CodegenFallback.doGenCode`
emits one
- // `((Expression) references[N]).eval(row)` call site per HOF. The kernel
dispatches to the
- // HOF's interpreted `eval`, which mutates `NamedLambdaVariable.value` per
element and reads
- // the input array through the kernel's typed Arrow getters. Per-task
`boundExpr` isolation
- // in `CometScalaUDFCodegen.kernelCache` prevents concurrent partitions
from racing on the
- // lambda variable's `AtomicReference`. See `CometCodegenHOFSuite`.
+ // `CodegenFallback` expressions are admitted. `CodegenFallback.doGenCode`
emits one
+ // `((Expression) references[N]).eval(row)` call site per expression. The
kernel dispatches
+ // to the expression's interpreted `eval` against `row` aliased to `this`,
so the eval reads
+ // through the kernel's typed Arrow getters. This covers
`HigherOrderFunction` (which mutates
+ // `NamedLambdaVariable.value` per element; see `CometCodegenHOFSuite`) as
well as other
+ // CodegenFallback expressions like `JsonToStructs` / `StructsToJson`
whose `eval(row)`
+ // simply calls `row.get(0, dataType)`. Per-task `boundExpr` isolation in
+ // `CometScalaUDFCodegen.kernelCache` prevents concurrent partitions from
racing on shared
+ // state inside the expression.
//
// Nondeterministic / stateful expressions are accepted: each cache entry
holds one kernel
// instance with a single `init(partitionIndex)` call, so `Rand` /
`MonotonicallyIncreasingID`
@@ -150,10 +152,6 @@ object CometBatchKernelCodegen extends Logging with
CometExprTraitShim {
boundExpr.find {
case _:
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction => true
case _: org.apache.spark.sql.catalyst.expressions.Generator => true
- case _: HigherOrderFunction => false
- case _: LambdaFunction => false
- case _: NamedLambdaVariable => false
- case _: CodegenFallback => true
case u: Unevaluable if isCodegenInertUnevaluable(u) => false
case _: Unevaluable => true
case _ => false
@@ -161,7 +159,7 @@ object CometBatchKernelCodegen extends Logging with
CometExprTraitShim {
case Some(bad) =>
return Some(
s"codegen dispatch: expression ${bad.getClass.getSimpleName} not
supported " +
- "(aggregate, generator, codegen-fallback, or unevaluable)")
+ "(aggregate, generator, or unevaluable)")
case None =>
}
val badRef = boundExpr.collectFirst {
diff --git a/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala
b/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala
index a1d5be84ff..3df275f057 100644
--- a/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala
@@ -20,7 +20,7 @@
package org.apache.comet.serde
import org.apache.spark.SparkEnv
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSeq, BindReferences, Expression, Literal, ScalaUDF}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSeq, BindReferences, Expression, Literal,
RuntimeReplaceable, ScalaUDF}
import org.apache.spark.sql.types.BinaryType
import org.apache.comet.CometConf
@@ -78,10 +78,20 @@ object CometScalaUDF extends CometExpressionSerde[ScalaUDF]
{
return None
}
+ // `RuntimeReplaceable` expressions (e.g. Spark 4's `StructsToJson`) have
a `doGenCode` that
+ // always throws "Cannot generate code for expression". Catalyst's
`ReplaceExpressions` rule
+ // normally rewrites them to their `replacement` form before codegen runs.
Comet's serde
+ // sometimes works with the pre-rewrite form (via shim reconstruction) for
matching purposes,
+ // so unwrap to the replacement here before binding so the kernel compiles.
+ val target = expr match {
+ case rr: RuntimeReplaceable => rr.replacement
+ case other => other
+ }
+
// Bind against only the AttributeReferences the tree actually reads, so
ordinals align with
// the data args we ship.
- val attrs = expr.collect { case a: AttributeReference => a }.distinct
- val boundExpr = BindReferences.bindReference(expr, AttributeSeq(attrs))
+ val attrs = target.collect { case a: AttributeReference => a }.distinct
+ val boundExpr = BindReferences.bindReference(target, AttributeSeq(attrs))
// Gate at plan time. Surface the reason via withFallbackReason rather
than crashing Janino
// at execute.
diff --git a/spark/src/main/scala/org/apache/comet/serde/json.scala
b/spark/src/main/scala/org/apache/comet/serde/json.scala
index 5f296599d6..0eac414d13 100644
--- a/spark/src/main/scala/org/apache/comet/serde/json.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/json.scala
@@ -19,18 +19,30 @@
package org.apache.comet.serde
-import org.apache.spark.sql.catalyst.expressions.LengthOfJsonArray
+import org.apache.spark.sql.catalyst.expressions.{Attribute, LengthOfJsonArray}
-object CometLengthOfJsonArray
- extends CometScalarFunction[LengthOfJsonArray]("json_array_length") {
+import org.apache.comet.CometConf
+import org.apache.comet.serde.ExprOuterClass.Expr
+import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal,
optExprWithFallbackReason, scalarFunctionExprToProto}
- private val IncompatibleReason: String =
- "Spark's lenient JSON parser allows single quotes, unescaped controls, " +
- "and trailing content, " +
- "while Comet's serde_json requires strict JSON."
-
- override def getIncompatibleReasons(): Seq[String] = Seq(IncompatibleReason)
+/**
+ * `json_array_length` runs Spark's own implementation through the codegen
dispatcher by default,
+ * for byte-exact results. The native (rust) path is faster but incompatible
with Spark for
+ * single-quoted JSON, unescaped control characters, and trailing content, so
it is opt-in via
+ * `spark.comet.expression.LengthOfJsonArray.allowIncompatible`; otherwise it
rides the codegen
+ * dispatcher via [[CometCodegenDispatch]].
+ */
+object CometLengthOfJsonArray extends CometCodegenDispatch[LengthOfJsonArray] {
- override def getSupportLevel(expr: LengthOfJsonArray): SupportLevel =
Incompatible(
- Some(IncompatibleReason))
+ override def convert(
+ expr: LengthOfJsonArray,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[Expr] =
+ if (CometConf.isExprAllowIncompat(getExprConfigName(expr))) {
+ val childExpr = expr.children.map(exprToProtoInternal(_, inputs,
binding))
+ val optExpr = scalarFunctionExprToProto("json_array_length", childExpr:
_*)
+ optExprWithFallbackReason(optExpr, expr, expr.children: _*)
+ } else {
+ super.convert(expr, inputs, binding)
+ }
}
diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala
b/spark/src/main/scala/org/apache/comet/serde/strings.scala
index fb23ca87d6..ee961a597f 100644
--- a/spark/src/main/scala/org/apache/comet/serde/strings.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala
@@ -480,31 +480,32 @@ object CometStringSplit extends
CometExpressionSerde[StringSplit] {
}
}
-object CometGetJsonObject extends CometExpressionSerde[GetJsonObject] {
-
- private val incompatReason =
- "Spark allows single-quoted JSON and unescaped control characters which
Comet does not" +
- " support"
-
- override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason)
-
- override def getSupportLevel(expr: GetJsonObject): SupportLevel =
- Incompatible(Some(incompatReason))
+/**
+ * `get_json_object` runs Spark's own implementation through the codegen
dispatcher by default,
+ * for byte-exact results. The native (rust) path is faster but incompatible
with Spark for
+ * single-quoted JSON and unescaped control characters, so it is opt-in via
+ * `spark.comet.expression.GetJsonObject.allowIncompatible`; otherwise it
rides the codegen
+ * dispatcher via [[CometCodegenDispatch]].
+ */
+object CometGetJsonObject extends CometCodegenDispatch[GetJsonObject] {
override def convert(
expr: GetJsonObject,
inputs: Seq[Attribute],
- binding: Boolean): Option[Expr] = {
- val jsonExpr = exprToProtoInternal(expr.json, inputs, binding)
- val pathExpr = exprToProtoInternal(expr.path, inputs, binding)
- val optExpr = scalarFunctionExprToProtoWithReturnType(
- "get_json_object",
- expr.dataType,
- false,
- jsonExpr,
- pathExpr)
- optExprWithFallbackReason(optExpr, expr, expr.json, expr.path)
- }
+ binding: Boolean): Option[Expr] =
+ if (CometConf.isExprAllowIncompat(getExprConfigName(expr))) {
+ val jsonExpr = exprToProtoInternal(expr.json, inputs, binding)
+ val pathExpr = exprToProtoInternal(expr.path, inputs, binding)
+ val optExpr = scalarFunctionExprToProtoWithReturnType(
+ "get_json_object",
+ expr.dataType,
+ false,
+ jsonExpr,
+ pathExpr)
+ optExprWithFallbackReason(optExpr, expr, expr.json, expr.path)
+ } else {
+ super.convert(expr, inputs, binding)
+ }
}
trait CommonStringExprs {
diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala
b/spark/src/main/scala/org/apache/comet/serde/structs.scala
index 3c7dc17844..c38b12bb6d 100644
--- a/spark/src/main/scala/org/apache/comet/serde/structs.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
CreateNamedStruct,
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.withFallbackReason
import org.apache.comet.DataTypeSupport
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal,
serializeDataType}
@@ -113,42 +114,45 @@ object CometGetArrayStructFields extends
CometExpressionSerde[GetArrayStructFiel
}
}
-object CometStructsToJson extends CometExpressionSerde[StructsToJson] {
+/**
+ * `to_json` runs Spark's own implementation through the codegen dispatcher by
default, for
+ * byte-exact compatibility. The native (rust) path is faster but only covers
struct inputs of
+ * supported types with no options, so it is opt-in via
+ * `spark.comet.expression.StructsToJson.allowIncompatible`; any case it does
not cover
+ * (unsupported types or options) falls through to the codegen dispatcher via
+ * [[CometCodegenDispatch]].
+ */
+object CometStructsToJson extends CometCodegenDispatch[StructsToJson] {
- override def getSupportLevel(expr: StructsToJson): SupportLevel = {
- if (expr.options.nonEmpty) {
- return Unsupported(Some("StructsToJson with options is not supported"))
- }
- val dataType = expr.child.dataType
- if (!isSupportedType(dataType)) {
- return Unsupported(Some(s"Struct type: $dataType contains unsupported
types"))
- }
- Compatible()
- }
+ private def nativeSupported(expr: StructsToJson): Boolean =
+ expr.options.isEmpty && isSupportedType(expr.child.dataType)
override def convert(
expr: StructsToJson,
inputs: Seq[Attribute],
- binding: Boolean): Option[ExprOuterClass.Expr] = {
- val ignoreNullFields = SQLConf.get.jsonGeneratorIgnoreNullFields
- exprToProtoInternal(expr.child, inputs, binding) match {
- case Some(p) =>
- val toJson = ExprOuterClass.ToJson
- .newBuilder()
- .setChild(p)
- .setTimezone(expr.timeZoneId.getOrElse("UTC"))
- .setIgnoreNullFields(ignoreNullFields)
- .build()
- Some(
- ExprOuterClass.Expr
+ binding: Boolean): Option[ExprOuterClass.Expr] =
+ if (CometConf.isExprAllowIncompat(getExprConfigName(expr)) &&
nativeSupported(expr)) {
+ val ignoreNullFields = SQLConf.get.jsonGeneratorIgnoreNullFields
+ exprToProtoInternal(expr.child, inputs, binding) match {
+ case Some(p) =>
+ val toJson = ExprOuterClass.ToJson
.newBuilder()
- .setToJson(toJson)
- .build())
- case _ =>
- withFallbackReason(expr, expr.child)
- None
+ .setChild(p)
+ .setTimezone(expr.timeZoneId.getOrElse("UTC"))
+ .setIgnoreNullFields(ignoreNullFields)
+ .build()
+ Some(
+ ExprOuterClass.Expr
+ .newBuilder()
+ .setToJson(toJson)
+ .build())
+ case _ =>
+ withFallbackReason(expr, expr.child)
+ None
+ }
+ } else {
+ super.convert(expr, inputs, binding)
}
- }
def isSupportedType(dt: DataType): Boolean = {
dt match {
@@ -170,43 +174,24 @@ object CometStructsToJson extends
CometExpressionSerde[StructsToJson] {
}
}
-object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] {
-
- override def getIncompatibleReasons(): Seq[String] = Seq(
- "Partially implemented and not comprehensively tested")
-
- override def getUnsupportedReasons(): Seq[String] = Seq("Requires an
explicit schema")
+/**
+ * `from_json` runs Spark's own implementation through the codegen dispatcher
by default. The
+ * native (rust) path is partially implemented and not comprehensively tested,
so it is opt-in via
+ * `spark.comet.expression.JsonToStructs.allowIncompatible` and only for
schemas it supports; any
+ * other case falls through to the codegen dispatcher via
[[CometCodegenDispatch]].
+ */
+object CometJsonToStructs extends CometCodegenDispatch[JsonToStructs] {
- override def getSupportLevel(expr: JsonToStructs): SupportLevel = {
- // this feature is partially implemented and not comprehensively tested yet
- Incompatible()
- }
+ private def nativeSupported(expr: JsonToStructs): Boolean =
+ expr.schema != null && isSupportedSchema(expr.schema)
override def convert(
expr: JsonToStructs,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
- if (expr.schema == null) {
- withFallbackReason(expr, "from_json requires explicit schema")
- return None
- }
-
- def isSupportedType(dt: DataType): Boolean = {
- dt match {
- case StructType(fields) =>
- fields.nonEmpty && fields.forall(f => isSupportedType(f.dataType))
- case DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
- DataTypes.DoubleType | DataTypes.BooleanType |
DataTypes.StringType =>
- true
- case _ => false
- }
- }
-
- val schemaType = expr.schema
- if (!isSupportedType(schemaType)) {
- withFallbackReason(expr, "from_json: Unsupported schema type")
- return None
+ if (!(CometConf.isExprAllowIncompat(getExprConfigName(expr)) &&
nativeSupported(expr))) {
+ return super.convert(expr, inputs, binding)
}
val options = expr.options
@@ -228,7 +213,7 @@ object CometJsonToStructs extends
CometExpressionSerde[JsonToStructs] {
// Convert child expression and schema to protobuf
for {
childProto <- exprToProtoInternal(expr.child, inputs, binding)
- schemaProto <- serializeDataType(schemaType)
+ schemaProto <- serializeDataType(expr.schema)
} yield {
val fromJson = ExprOuterClass.FromJson
.newBuilder()
@@ -239,6 +224,15 @@ object CometJsonToStructs extends
CometExpressionSerde[JsonToStructs] {
ExprOuterClass.Expr.newBuilder().setFromJson(fromJson).build()
}
}
+
+ private def isSupportedSchema(dt: DataType): Boolean = dt match {
+ case StructType(fields) =>
+ fields.nonEmpty && fields.forall(f => isSupportedSchema(f.dataType))
+ case DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
DataTypes.DoubleType |
+ DataTypes.BooleanType | DataTypes.StringType =>
+ true
+ case _ => false
+ }
}
object CometStructsToCsv extends CometExpressionSerde[StructsToCsv] {
diff --git
a/spark/src/test/resources/sql-tests/expressions/json/json_array_length_fallback_reasons.sql
b/spark/src/test/resources/sql-tests/expressions/json/json_array_length_default.sql
similarity index 51%
rename from
spark/src/test/resources/sql-tests/expressions/json/json_array_length_fallback_reasons.sql
rename to
spark/src/test/resources/sql-tests/expressions/json/json_array_length_default.sql
index b1775368c3..0471bf8471 100644
---
a/spark/src/test/resources/sql-tests/expressions/json/json_array_length_fallback_reasons.sql
+++
b/spark/src/test/resources/sql-tests/expressions/json/json_array_length_default.sql
@@ -15,7 +15,21 @@
-- specific language governing permissions and limitations
-- under the License.
+-- Default engine: json_array_length runs Spark's own implementation through
the codegen
+-- dispatcher, so it stays in the Comet pipeline while matching Spark
byte-for-byte, including for
+-- inputs the native (rust) path cannot handle compatibly. The native path is
opt-in via
+-- spark.comet.expression.LengthOfJsonArray.allowIncompatible=true (see
json_array_length.sql).
+
-- Config: spark.comet.expression.LengthOfJsonArray.allowIncompatible=false
-query expect_fallback(Spark's lenient JSON parser allows single quotes,
unescaped controls, and trailing content, while Comet's serde_json requires
strict JSON.)
-SELECT json_array_length("[{'key':'value'}]")
\ No newline at end of file
+-- single-quoted JSON: the native serde_json path is incompatible here, but
Spark's lenient parser
+-- (run via the dispatcher) accepts it, so the default path matches Spark.
+query
+SELECT json_array_length("[{'key':'value'}]")
+
+-- trailing content: likewise handled compatibly by the default path.
+query
+SELECT json_array_length('[1,2,3] trailing')
+
+query
+SELECT json_array_length('[1,2,3,4]'), json_array_length('not an array'),
json_array_length(NULL)
diff --git
a/spark/src/test/resources/sql-tests/expressions/string/get_json_object.sql
b/spark/src/test/resources/sql-tests/expressions/string/get_json_object.sql
index edbebbdb77..ab21d4d375 100644
--- a/spark/src/test/resources/sql-tests/expressions/string/get_json_object.sql
+++ b/spark/src/test/resources/sql-tests/expressions/string/get_json_object.sql
@@ -15,6 +15,8 @@
-- specific language governing permissions and limitations
-- under the License.
+-- Opt in to the native (rust) get_json_object path. Without this,
get_json_object runs through the
+-- codegen dispatcher (Spark's own implementation) instead.
-- Config: spark.comet.expression.GetJsonObject.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true
diff --git
a/spark/src/test/resources/sql-tests/expressions/struct/structs_to_json.sql
b/spark/src/test/resources/sql-tests/expressions/struct/structs_to_json.sql
index 3cfd636d25..b6520d61f8 100644
--- a/spark/src/test/resources/sql-tests/expressions/struct/structs_to_json.sql
+++ b/spark/src/test/resources/sql-tests/expressions/struct/structs_to_json.sql
@@ -15,23 +15,82 @@
-- specific language governing permissions and limitations
-- under the License.
+-- Opt in to the native (rust) to_json path. Without this, to_json runs
through the codegen
+-- dispatcher (Spark's own implementation) instead.
+-- Config: spark.comet.expression.StructsToJson.allowIncompatible=true
+-- ignoreNullFields changes whether null struct fields are emitted, so
exercise both values.
-- ConfigMatrix: spark.sql.jsonGenerator.ignoreNullFields=false,true
statement
-CREATE TABLE test_to_json(a int, b string, f float, d double) USING parquet
+CREATE TABLE test_to_json(
+ i int, s string, f float, d double, bl boolean, bt tinyint, sh smallint, lng
bigint)
+USING parquet
statement
-INSERT INTO test_to_json VALUES (1, 'hello', cast('NaN' as float),
cast('Infinity' as double)), (NULL, NULL, cast('NaN' as float), cast('Infinity'
as double)), (NULL, NULL, NULL, NULL), (0, '', 0.0, 0.0), (2, 'hello',
cast('NaN' as float), cast('-Infinity' as double))
+INSERT INTO test_to_json VALUES
+ (1, 'hello', cast('NaN' as float), cast('Infinity' as double), true,
cast(127 as tinyint), cast(32767 as smallint), 9223372036854775807),
+ (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL),
+ (0, '', cast(0.0 as float), 0.0, false, cast(0 as tinyint), cast(0 as
smallint), 0),
+ (-2147483648, 'say "hi"', cast('-Infinity' as float), cast('-Infinity' as
double), true, cast(-128 as tinyint), cast(-32768 as smallint),
-9223372036854775808),
+ (2147483647, 'cafΓ© δΈζ', cast('NaN' as float), cast('NaN' as double), false,
cast(-1 as tinyint), cast(-1 as smallint), -1)
+-- every natively-supported type as a top-level struct field
query
-SELECT to_json(named_struct('a', a, 'b', b, 'f', f, 'd', d)) FROM test_to_json
+SELECT to_json(named_struct(
+ 'i', i, 's', s, 'f', f, 'd', d, 'bl', bl, 'bt', bt, 'sh', sh, 'lng', lng))
FROM test_to_json
--- literal arguments
+-- each type individually, column argument
query
-SELECT to_json(named_struct('a', 1, 'b', 'hello'))
+SELECT to_json(named_struct('i', i)) FROM test_to_json
-query expect_fallback(StructsToJson with options is not supported)
-SELECT to_json(named_struct('a', a, 'b', b), map('timestampFormat',
'dd/MM/yyyy')) FROM test_to_json
+query
+SELECT to_json(named_struct('s', s)) FROM test_to_json
+
+query
+SELECT to_json(named_struct('f', f)) FROM test_to_json
+
+query
+SELECT to_json(named_struct('d', d)) FROM test_to_json
+
+query
+SELECT to_json(named_struct('bl', bl)) FROM test_to_json
+
+query
+SELECT to_json(named_struct('bt', bt)) FROM test_to_json
+
+query
+SELECT to_json(named_struct('sh', sh)) FROM test_to_json
+
+query
+SELECT to_json(named_struct('lng', lng)) FROM test_to_json
+
+-- nested struct of supported types
+query
+SELECT to_json(named_struct(
+ 'inner', named_struct('i', i, 's', s, 'bl', bl),
+ 'lng', lng)) FROM test_to_json
+
+-- deeply nested struct
+query
+SELECT to_json(named_struct(
+ 'a', named_struct('b', named_struct('c', i, 'd', s)))) FROM test_to_json
+
+-- literal arguments covering each type
+query
+SELECT to_json(named_struct(
+ 'i', 1, 's', 'hello', 'f', cast(1.5 as float), 'd', 2.5,
+ 'bl', true, 'bt', cast(7 as tinyint), 'sh', cast(700 as smallint), 'lng',
9000000000))
+
+-- literal struct with a value that requires JSON string escaping
+query
+SELECT to_json(named_struct('s', 'quote " and slash \\ and tab\there'))
-query expect_fallback(Struct type:
StructType(StructField(a,IntegerType,true),StructField(b,ArrayType(StringType,true),false))
contains unsupported types)
-SELECT to_json(named_struct('a', a, 'b', array(b))) FROM test_to_json
+-- to_json with options is not supported by the native (rust) path, so it
routes to the codegen
+-- dispatcher, which runs Spark's own implementation inside Comet (still
native Comet execution).
+query
+SELECT to_json(named_struct('i', i, 's', s), map('timestampFormat',
'dd/MM/yyyy')) FROM test_to_json
+
+-- to_json with an array field is not supported by the native (rust) path, so
it likewise routes to
+-- the codegen dispatcher.
+query
+SELECT to_json(named_struct('i', i, 'arr', array(s))) FROM test_to_json
diff --git
a/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala
b/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala
index b9c5175117..9416861da4 100644
--- a/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala
@@ -166,13 +166,16 @@ class CometCodegenSourceSuite extends AnyFunSuite {
s"got:\n$src")
}
- test("canHandle rejects CodegenFallback expressions") {
+ test("canHandle accepts CodegenFallback expressions (delegates to
eval(row))") {
+ // CodegenFallback.doGenCode emits ((Expression) references[N]).eval(row)
which is the same
+ // mechanism that backs HigherOrderFunction support: the eval reads
through the kernel's typed
+ // Arrow getters via the row alias. Other CodegenFallback expressions
(JsonToStructs,
+ // StructsToJson, ...) ride the same path.
val expr = FakeCodegenFallback(BoundReference(0, StringType, nullable =
true))
val reason = CometBatchKernelCodegen.canHandle(expr)
- assert(reason.isDefined, "expected canHandle to reject CodegenFallback")
assert(
- reason.get.contains("FakeCodegenFallback"),
- s"expected reason to name the rejected expression class; got:
${reason.get}")
+ reason.isEmpty,
+ s"expected canHandle to accept CodegenFallback; got rejection:
${reason.getOrElse("")}")
}
test("canHandle accepts Nondeterministic expressions (per-partition kernel
handles state)") {
diff --git
a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala
index c3446f6264..2356a985ef 100644
--- a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala
@@ -39,6 +39,8 @@ class CometJsonExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelpe
pos: Position): Unit = {
super.test(testName, testTags: _*) {
withSQLConf(
+ // This suite exercises the native (rust) JSON path, which is opt-in
per expression via
+ // `allowIncompatible`. By default these expressions run through the
codegen dispatcher.
CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) ->
"true",
CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) ->
"true") {
testFun
@@ -70,16 +72,16 @@ class CometJsonExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelpe
}
}
- test("to_json - fallback reasons") {
+ test("to_json - options and unsupported native types route to the codegen
engine") {
withTable("t") {
sql("CREATE TABLE t(a INT, b STRING) USING parquet")
sql("INSERT INTO t VALUES (1, 'hello')")
- checkSparkAnswerAndFallbackReason(
- "SELECT to_json(named_struct('a', a, 'b', b), map('timestampFormat',
'dd/MM/yyyy')) FROM t",
- "StructsToJson with options is not supported")
- checkSparkAnswerAndFallbackReason(
- "SELECT to_json(named_struct('b', array(b))) FROM t",
- "Struct type:
StructType(StructField(b,ArrayType(StringType,true),false)) contains
unsupported types")
+ // The native (rust) path does not support to_json with options or
array/map types. Even with
+ // allowIncompatible enabled, these cases fall back to the codegen
dispatcher, which runs
+ // Spark's own implementation inside the Comet pipeline rather than
falling back to Spark.
+ checkSparkAnswerAndOperator(
+ "SELECT to_json(named_struct('a', a, 'b', b), map('timestampFormat',
'dd/MM/yyyy')) FROM t")
+ checkSparkAnswerAndOperator("SELECT to_json(named_struct('b', array(b)))
FROM t")
}
}
diff --git a/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala
b/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala
new file mode 100644
index 0000000000..4a4df2e9b1
--- /dev/null
+++ b/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.CometTestBase
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+class CometJsonJvmSuite extends CometTestBase with AdaptiveSparkPlanHelper {
+
+ // No per-expression `allowIncompatible` is set, so all JSON expressions run
through the
+ // codegen dispatcher (Spark's own code) rather than the native rust path.
+ override protected def sparkConf: SparkConf =
+ super.sparkConf
+ .set(CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key, "true")
+
+ private val rows = Seq(
+ """{"a":1,"b":"x","arr":[1,2,3]}""",
+ """{"a":2,"b":"y","arr":[]}""",
+ """{"a":null,"b":"z","arr":[10]}""",
+ null,
+ """not json""")
+
+ private def withJsonTable(f: => Unit): Unit = {
+ withTable("t") {
+ sql("CREATE TABLE t (j STRING) USING parquet")
+ val sqlRows = rows
+ .map(v => if (v == null) "(NULL)" else s"('${v.replace("'", "''")}')")
+ .mkString(", ")
+ sql(s"INSERT INTO t VALUES $sqlRows")
+ f
+ }
+ }
+
+ test("get_json_object via JVM engine") {
+ withJsonTable {
+ checkSparkAnswerAndOperator(sql("SELECT get_json_object(j, '$.a') FROM
t"))
+ checkSparkAnswerAndOperator(sql("SELECT get_json_object(j, '$.b') FROM
t"))
+ }
+ }
+
+ test("from_json with explicit schema via JVM engine") {
+ withJsonTable {
+ checkSparkAnswerAndOperator(sql("SELECT from_json(j, 'a INT, b STRING')
FROM t"))
+ }
+ }
+
+ test("to_json round-trip via JVM engine") {
+ withJsonTable {
+ checkSparkAnswerAndOperator(sql("SELECT to_json(from_json(j, 'a INT, b
STRING')) FROM t"))
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]