This is an automated email from the ASF dual-hosted git repository.

andygrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ee5d65fab feat: 100% Spark-compatible JSON support via codegen 
dispatcher (#4305)
3ee5d65fab is described below

commit 3ee5d65fab757807824bc74236df7199d4e4566c
Author: Andy Grove <[email protected]>
AuthorDate: Thu Jun 4 22:09:07 2026 -0600

    feat: 100% Spark-compatible JSON support via codegen dispatcher (#4305)
---
 .github/workflows/pr_build_linux.yml               |   1 +
 .github/workflows/pr_build_macos.yml               |   1 +
 .../expression-audits/json_funcs.md                |   6 ++
 .../user-guide/latest/compatibility/index.md       |   1 +
 .../source/user-guide/latest/compatibility/json.md |  56 ++++++++++
 docs/source/user-guide/latest/expressions.md       |   2 +-
 docs/source/user-guide/latest/index.rst            |   1 +
 .../comet/codegen/CometBatchKernelCodegen.scala    |  28 +++--
 .../org/apache/comet/serde/CometScalaUDF.scala     |  16 ++-
 .../main/scala/org/apache/comet/serde/json.scala   |  34 ++++--
 .../scala/org/apache/comet/serde/strings.scala     |  43 ++++----
 .../scala/org/apache/comet/serde/structs.scala     | 114 ++++++++++-----------
 ...k_reasons.sql => json_array_length_default.sql} |  18 +++-
 .../expressions/string/get_json_object.sql         |   2 +
 .../expressions/struct/structs_to_json.sql         |  77 ++++++++++++--
 .../org/apache/comet/CometCodegenSourceSuite.scala |  11 +-
 .../apache/comet/CometJsonExpressionSuite.scala    |  16 +--
 .../scala/org/apache/comet/CometJsonJvmSuite.scala |  70 +++++++++++++
 18 files changed, 364 insertions(+), 133 deletions(-)

diff --git a/.github/workflows/pr_build_linux.yml 
b/.github/workflows/pr_build_linux.yml
index 68025fd7ba..1cf91828c1 100644
--- a/.github/workflows/pr_build_linux.yml
+++ b/.github/workflows/pr_build_linux.yml
@@ -377,6 +377,7 @@ jobs:
               org.apache.comet.CometMapExpressionSuite
               org.apache.comet.CometCsvExpressionSuite
               org.apache.comet.CometJsonExpressionSuite
+              org.apache.comet.CometJsonJvmSuite
               org.apache.comet.SparkErrorConverterSuite
               org.apache.comet.expressions.conditional.CometIfSuite
               org.apache.comet.expressions.conditional.CometCoalesceSuite
diff --git a/.github/workflows/pr_build_macos.yml 
b/.github/workflows/pr_build_macos.yml
index d0a03eeb75..41285dc8fd 100644
--- a/.github/workflows/pr_build_macos.yml
+++ b/.github/workflows/pr_build_macos.yml
@@ -193,6 +193,7 @@ jobs:
               org.apache.comet.CometMapExpressionSuite
               org.apache.comet.CometCsvExpressionSuite
               org.apache.comet.CometJsonExpressionSuite
+              org.apache.comet.CometJsonJvmSuite
               org.apache.comet.SparkErrorConverterSuite
               org.apache.comet.expressions.conditional.CometIfSuite
               org.apache.comet.expressions.conditional.CometCoalesceSuite
diff --git a/docs/source/contributor-guide/expression-audits/json_funcs.md 
b/docs/source/contributor-guide/expression-audits/json_funcs.md
index 4184ceac22..689142e2a3 100644
--- a/docs/source/contributor-guide/expression-audits/json_funcs.md
+++ b/docs/source/contributor-guide/expression-audits/json_funcs.md
@@ -33,6 +33,12 @@
 - Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1.
 - Known incompatibility: Spark accepts single-quoted JSON and unescaped 
control characters; Comet's native parser (built on `serde_json`) rejects both, 
so those inputs require 
`spark.comet.expression.GetJsonObject.allowIncompatible=true` and may still 
produce different results. Non-default Spark 4.0 string collations are not 
propagated (https://github.com/apache/datafusion-comet/issues/2190).
 
+## json_array_length
+
+- `LengthOfJsonArray`: `UnaryExpression with ExpectsInputTypes with 
CodegenFallback`; `inputTypes = Seq(StringType) -> IntegerType`. Returns NULL 
for NULL input, invalid JSON, or non-array JSON; otherwise the number of 
top-level array elements.
+- Runs through the codegen dispatcher by default for byte-exact Spark 
compatibility.
+- Known incompatibility: the native path (built on `serde_json`) requires 
strict JSON, so single-quoted JSON, unescaped control characters, and trailing 
content require 
`spark.comet.expression.LengthOfJsonArray.allowIncompatible=true` and may still 
produce different results.
+
 ## to_json
 
 - Partial native support; options and map/array inputs fall back.
diff --git a/docs/source/user-guide/latest/compatibility/index.md 
b/docs/source/user-guide/latest/compatibility/index.md
index 542d6902ec..46578d1683 100644
--- a/docs/source/user-guide/latest/compatibility/index.md
+++ b/docs/source/user-guide/latest/compatibility/index.md
@@ -28,4 +28,5 @@ This guide documents areas where Comet's behavior is known to 
differ from Spark.
 - **Regular expressions**: differences between the Rust regexp crate and 
Java's regex engine.
 - **Operators**: operator-level compatibility notes, including window 
functions and round-robin partitioning.
 - **Expressions**: per-expression compatibility notes, including cast.
+- **JSON**: choosing between the native and Spark-compatible engines for JSON 
expressions.
 - **Spark versions**: version-specific known issues and limitations.
diff --git a/docs/source/user-guide/latest/compatibility/json.md 
b/docs/source/user-guide/latest/compatibility/json.md
new file mode 100644
index 0000000000..2dc79c27a8
--- /dev/null
+++ b/docs/source/user-guide/latest/compatibility/json.md
@@ -0,0 +1,56 @@
+<!---
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# JSON Compatibility
+
+Comet can evaluate JSON expressions (`get_json_object`, `from_json`, `to_json`,
+`json_array_length`) two ways:
+
+- **Codegen dispatcher (default):** Spark's own `doGenCode` for the expression
+  runs inside the Comet pipeline (via Comet's Arrow-direct codegen dispatcher),
+  giving byte-exact compatibility with Spark at the cost of a JNI roundtrip per
+  batch. This rides the codegen dispatcher
+  (`spark.comet.exec.scalaUDF.codegen.enabled`, enabled by default); if the
+  dispatcher is disabled, the operator falls back to Spark.
+- **Native (rust) path:** the native DataFusion implementation. Faster, but has
+  known compatibility gaps with Spark on certain inputs, so it is **opt-in per
+  expression** via the expression's `allowIncompatible` config. Any expression 
or
+  input case with no native implementation falls back to the codegen 
dispatcher.
+
+## Expression coverage
+
+| SQL                 | Native (rust) path                                     
                                        | Opt-in config                         
                       |
+| ------------------- | 
----------------------------------------------------------------------------------------------
 | ------------------------------------------------------------ |
+| `get_json_object`   | Supported, with gaps on single-quoted JSON and 
unescaped control characters                    | 
`spark.comet.expression.GetJsonObject.allowIncompatible`     |
+| `from_json`         | Supported with restrictions (PERMISSIVE mode only, 
simple schema types only)                   | 
`spark.comet.expression.JsonToStructs.allowIncompatible`     |
+| `to_json`           | Supported for struct inputs only, no options           
                                        | 
`spark.comet.expression.StructsToJson.allowIncompatible`     |
+| `json_array_length` | Supported, with gaps on single-quoted JSON, unescaped 
control characters, and trailing content | 
`spark.comet.expression.LengthOfJsonArray.allowIncompatible` |
+
+When the native path is enabled but an expression or input case has no native
+implementation (for example `to_json` with map or array inputs, or `from_json`
+with an unsupported schema), Comet falls back to the codegen dispatcher for 
that
+case.
+
+## When to use the native path
+
+- You want the faster native path and your inputs avoid the known compatibility
+  gaps above.
+- Enable it per expression, for example
+  `spark.comet.expression.GetJsonObject.allowIncompatible=true`. Cases the 
native path
+  does not cover still fall back to the codegen dispatcher.
diff --git a/docs/source/user-guide/latest/expressions.md 
b/docs/source/user-guide/latest/expressions.md
index ed190bebb9..79093ddb9a 100644
--- a/docs/source/user-guide/latest/expressions.md
+++ b/docs/source/user-guide/latest/expressions.md
@@ -338,7 +338,7 @@ expression-level). The `outer` variants are wired but 
marked `Incompatible`; the
 | --- | --- | --- |
 | `from_json` | βœ… | Falls back by default; opt-in via allowIncompatible 
([audit](../../contributor-guide/expression-audits/json_funcs.md#from_json)) |
 | `get_json_object` | βœ… | Some inputs need allowIncompatible 
([audit](../../contributor-guide/expression-audits/json_funcs.md#get_json_object))
 |
-| `json_array_length` | πŸ”œ | tracking 
[#4098](https://github.com/apache/datafusion-comet/issues/4098) |
+| `json_array_length` | βœ… | Single-quoted/trailing JSON needs 
allowIncompatible 
([audit](../../contributor-guide/expression-audits/json_funcs.md#json_array_length))
 |
 | `json_object_keys` | πŸ”œ | 
[#3161](https://github.com/apache/datafusion-comet/issues/3161) |
 | `json_tuple` | πŸ”œ | 
[#3160](https://github.com/apache/datafusion-comet/issues/3160) |
 | `schema_of_json` | πŸ”œ | 
[#3163](https://github.com/apache/datafusion-comet/issues/3163) |
diff --git a/docs/source/user-guide/latest/index.rst 
b/docs/source/user-guide/latest/index.rst
index 00f770c27e..621fb9d73e 100644
--- a/docs/source/user-guide/latest/index.rst
+++ b/docs/source/user-guide/latest/index.rst
@@ -62,6 +62,7 @@ to read more.
    compatibility/regex
    compatibility/operators
    compatibility/expressions/index
+   compatibility/json
    compatibility/spark-versions
 
 .. toctree::
diff --git 
a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala 
b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala
index 25bd70898e..1c990835bb 100644
--- 
a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala
+++ 
b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala
@@ -23,7 +23,7 @@ import org.apache.arrow.vector._
 import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector}
 import org.apache.arrow.vector.types.pojo.Field
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, 
HigherOrderFunction, LambdaFunction, Literal, NamedLambdaVariable, Unevaluable}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, 
Literal, Unevaluable}
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -107,9 +107,8 @@ object CometBatchKernelCodegen extends Logging with 
CometExprTraitShim {
    * back cleanly rather than crashing the Janino compile at execute time.
    *
    * Checks every `BoundReference`'s data type and the root `expr.dataType` 
against
-   * [[isSupportedDataType]], rejects aggregates / generators / 
`CodegenFallback` (other than
-   * HOFs, which are admitted), and gates total nested-field count on
-   * `spark.sql.codegen.maxFields`.
+   * [[isSupportedDataType]], rejects aggregates / generators / `Unevaluable`, 
and gates total
+   * nested-field count on `spark.sql.codegen.maxFields`.
    */
   def canHandle(boundExpr: Expression): Option[String] = {
     if (!isSupportedDataType(boundExpr.dataType)) {
@@ -127,12 +126,15 @@ object CometBatchKernelCodegen extends Logging with 
CometExprTraitShim {
         s"codegen dispatch: too many nested fields ($totalFields > " +
           s"spark.sql.codegen.maxFields=$maxFields)")
     }
-    // HOFs are `CodegenFallback` but admitted: `CodegenFallback.doGenCode` 
emits one
-    // `((Expression) references[N]).eval(row)` call site per HOF. The kernel 
dispatches to the
-    // HOF's interpreted `eval`, which mutates `NamedLambdaVariable.value` per 
element and reads
-    // the input array through the kernel's typed Arrow getters. Per-task 
`boundExpr` isolation
-    // in `CometScalaUDFCodegen.kernelCache` prevents concurrent partitions 
from racing on the
-    // lambda variable's `AtomicReference`. See `CometCodegenHOFSuite`.
+    // `CodegenFallback` expressions are admitted. `CodegenFallback.doGenCode` 
emits one
+    // `((Expression) references[N]).eval(row)` call site per expression. The 
kernel dispatches
+    // to the expression's interpreted `eval` against `row` aliased to `this`, 
so the eval reads
+    // through the kernel's typed Arrow getters. This covers 
`HigherOrderFunction` (which mutates
+    // `NamedLambdaVariable.value` per element; see `CometCodegenHOFSuite`) as 
well as other
+    // CodegenFallback expressions like `JsonToStructs` / `StructsToJson` 
whose `eval(row)`
+    // simply calls `row.get(0, dataType)`. Per-task `boundExpr` isolation in
+    // `CometScalaUDFCodegen.kernelCache` prevents concurrent partitions from 
racing on shared
+    // state inside the expression.
     //
     // Nondeterministic / stateful expressions are accepted: each cache entry 
holds one kernel
     // instance with a single `init(partitionIndex)` call, so `Rand` / 
`MonotonicallyIncreasingID`
@@ -150,10 +152,6 @@ object CometBatchKernelCodegen extends Logging with 
CometExprTraitShim {
     boundExpr.find {
       case _: 
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction => true
       case _: org.apache.spark.sql.catalyst.expressions.Generator => true
-      case _: HigherOrderFunction => false
-      case _: LambdaFunction => false
-      case _: NamedLambdaVariable => false
-      case _: CodegenFallback => true
       case u: Unevaluable if isCodegenInertUnevaluable(u) => false
       case _: Unevaluable => true
       case _ => false
@@ -161,7 +159,7 @@ object CometBatchKernelCodegen extends Logging with 
CometExprTraitShim {
       case Some(bad) =>
         return Some(
           s"codegen dispatch: expression ${bad.getClass.getSimpleName} not 
supported " +
-            "(aggregate, generator, codegen-fallback, or unevaluable)")
+            "(aggregate, generator, or unevaluable)")
       case None =>
     }
     val badRef = boundExpr.collectFirst {
diff --git a/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala 
b/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala
index a1d5be84ff..3df275f057 100644
--- a/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala
@@ -20,7 +20,7 @@
 package org.apache.comet.serde
 
 import org.apache.spark.SparkEnv
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSeq, BindReferences, Expression, Literal, ScalaUDF}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSeq, BindReferences, Expression, Literal, 
RuntimeReplaceable, ScalaUDF}
 import org.apache.spark.sql.types.BinaryType
 
 import org.apache.comet.CometConf
@@ -78,10 +78,20 @@ object CometScalaUDF extends CometExpressionSerde[ScalaUDF] 
{
       return None
     }
 
+    // `RuntimeReplaceable` expressions (e.g. Spark 4's `StructsToJson`) have 
a `doGenCode` that
+    // always throws "Cannot generate code for expression". Catalyst's 
`ReplaceExpressions` rule
+    // normally rewrites them to their `replacement` form before codegen runs. 
Comet's serde
+    // sometimes works with the pre-rewrite form (via shim reconstruction) for 
matching purposes,
+    // so unwrap to the replacement here before binding so the kernel compiles.
+    val target = expr match {
+      case rr: RuntimeReplaceable => rr.replacement
+      case other => other
+    }
+
     // Bind against only the AttributeReferences the tree actually reads, so 
ordinals align with
     // the data args we ship.
-    val attrs = expr.collect { case a: AttributeReference => a }.distinct
-    val boundExpr = BindReferences.bindReference(expr, AttributeSeq(attrs))
+    val attrs = target.collect { case a: AttributeReference => a }.distinct
+    val boundExpr = BindReferences.bindReference(target, AttributeSeq(attrs))
 
     // Gate at plan time. Surface the reason via withFallbackReason rather 
than crashing Janino
     // at execute.
diff --git a/spark/src/main/scala/org/apache/comet/serde/json.scala 
b/spark/src/main/scala/org/apache/comet/serde/json.scala
index 5f296599d6..0eac414d13 100644
--- a/spark/src/main/scala/org/apache/comet/serde/json.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/json.scala
@@ -19,18 +19,30 @@
 
 package org.apache.comet.serde
 
-import org.apache.spark.sql.catalyst.expressions.LengthOfJsonArray
+import org.apache.spark.sql.catalyst.expressions.{Attribute, LengthOfJsonArray}
 
-object CometLengthOfJsonArray
-    extends CometScalarFunction[LengthOfJsonArray]("json_array_length") {
+import org.apache.comet.CometConf
+import org.apache.comet.serde.ExprOuterClass.Expr
+import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, 
optExprWithFallbackReason, scalarFunctionExprToProto}
 
-  private val IncompatibleReason: String =
-    "Spark's lenient JSON parser allows single quotes, unescaped controls, " +
-      "and trailing content, " +
-      "while Comet's serde_json requires strict JSON."
-
-  override def getIncompatibleReasons(): Seq[String] = Seq(IncompatibleReason)
+/**
+ * `json_array_length` runs Spark's own implementation through the codegen 
dispatcher by default,
+ * for byte-exact results. The native (rust) path is faster but incompatible 
with Spark for
+ * single-quoted JSON, unescaped control characters, and trailing content, so 
it is opt-in via
+ * `spark.comet.expression.LengthOfJsonArray.allowIncompatible`; otherwise it 
rides the codegen
+ * dispatcher via [[CometCodegenDispatch]].
+ */
+object CometLengthOfJsonArray extends CometCodegenDispatch[LengthOfJsonArray] {
 
-  override def getSupportLevel(expr: LengthOfJsonArray): SupportLevel = 
Incompatible(
-    Some(IncompatibleReason))
+  override def convert(
+      expr: LengthOfJsonArray,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[Expr] =
+    if (CometConf.isExprAllowIncompat(getExprConfigName(expr))) {
+      val childExpr = expr.children.map(exprToProtoInternal(_, inputs, 
binding))
+      val optExpr = scalarFunctionExprToProto("json_array_length", childExpr: 
_*)
+      optExprWithFallbackReason(optExpr, expr, expr.children: _*)
+    } else {
+      super.convert(expr, inputs, binding)
+    }
 }
diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala 
b/spark/src/main/scala/org/apache/comet/serde/strings.scala
index fb23ca87d6..ee961a597f 100644
--- a/spark/src/main/scala/org/apache/comet/serde/strings.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala
@@ -480,31 +480,32 @@ object CometStringSplit extends 
CometExpressionSerde[StringSplit] {
   }
 }
 
-object CometGetJsonObject extends CometExpressionSerde[GetJsonObject] {
-
-  private val incompatReason =
-    "Spark allows single-quoted JSON and unescaped control characters which 
Comet does not" +
-      " support"
-
-  override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason)
-
-  override def getSupportLevel(expr: GetJsonObject): SupportLevel =
-    Incompatible(Some(incompatReason))
+/**
+ * `get_json_object` runs Spark's own implementation through the codegen 
dispatcher by default,
+ * for byte-exact results. The native (rust) path is faster but incompatible 
with Spark for
+ * single-quoted JSON and unescaped control characters, so it is opt-in via
+ * `spark.comet.expression.GetJsonObject.allowIncompatible`; otherwise it 
rides the codegen
+ * dispatcher via [[CometCodegenDispatch]].
+ */
+object CometGetJsonObject extends CometCodegenDispatch[GetJsonObject] {
 
   override def convert(
       expr: GetJsonObject,
       inputs: Seq[Attribute],
-      binding: Boolean): Option[Expr] = {
-    val jsonExpr = exprToProtoInternal(expr.json, inputs, binding)
-    val pathExpr = exprToProtoInternal(expr.path, inputs, binding)
-    val optExpr = scalarFunctionExprToProtoWithReturnType(
-      "get_json_object",
-      expr.dataType,
-      false,
-      jsonExpr,
-      pathExpr)
-    optExprWithFallbackReason(optExpr, expr, expr.json, expr.path)
-  }
+      binding: Boolean): Option[Expr] =
+    if (CometConf.isExprAllowIncompat(getExprConfigName(expr))) {
+      val jsonExpr = exprToProtoInternal(expr.json, inputs, binding)
+      val pathExpr = exprToProtoInternal(expr.path, inputs, binding)
+      val optExpr = scalarFunctionExprToProtoWithReturnType(
+        "get_json_object",
+        expr.dataType,
+        false,
+        jsonExpr,
+        pathExpr)
+      optExprWithFallbackReason(optExpr, expr, expr.json, expr.path)
+    } else {
+      super.convert(expr, inputs, binding)
+    }
 }
 
 trait CommonStringExprs {
diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala 
b/spark/src/main/scala/org/apache/comet/serde/structs.scala
index 3c7dc17844..c38b12bb6d 100644
--- a/spark/src/main/scala/org/apache/comet/serde/structs.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
CreateNamedStruct,
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
+import org.apache.comet.CometConf
 import org.apache.comet.CometSparkSessionExtensions.withFallbackReason
 import org.apache.comet.DataTypeSupport
 import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, 
serializeDataType}
@@ -113,42 +114,45 @@ object CometGetArrayStructFields extends 
CometExpressionSerde[GetArrayStructFiel
   }
 }
 
-object CometStructsToJson extends CometExpressionSerde[StructsToJson] {
+/**
+ * `to_json` runs Spark's own implementation through the codegen dispatcher by 
default, for
+ * byte-exact compatibility. The native (rust) path is faster but only covers 
struct inputs of
+ * supported types with no options, so it is opt-in via
+ * `spark.comet.expression.StructsToJson.allowIncompatible`; any case it does 
not cover
+ * (unsupported types or options) falls through to the codegen dispatcher via
+ * [[CometCodegenDispatch]].
+ */
+object CometStructsToJson extends CometCodegenDispatch[StructsToJson] {
 
-  override def getSupportLevel(expr: StructsToJson): SupportLevel = {
-    if (expr.options.nonEmpty) {
-      return Unsupported(Some("StructsToJson with options is not supported"))
-    }
-    val dataType = expr.child.dataType
-    if (!isSupportedType(dataType)) {
-      return Unsupported(Some(s"Struct type: $dataType contains unsupported 
types"))
-    }
-    Compatible()
-  }
+  private def nativeSupported(expr: StructsToJson): Boolean =
+    expr.options.isEmpty && isSupportedType(expr.child.dataType)
 
   override def convert(
       expr: StructsToJson,
       inputs: Seq[Attribute],
-      binding: Boolean): Option[ExprOuterClass.Expr] = {
-    val ignoreNullFields = SQLConf.get.jsonGeneratorIgnoreNullFields
-    exprToProtoInternal(expr.child, inputs, binding) match {
-      case Some(p) =>
-        val toJson = ExprOuterClass.ToJson
-          .newBuilder()
-          .setChild(p)
-          .setTimezone(expr.timeZoneId.getOrElse("UTC"))
-          .setIgnoreNullFields(ignoreNullFields)
-          .build()
-        Some(
-          ExprOuterClass.Expr
+      binding: Boolean): Option[ExprOuterClass.Expr] =
+    if (CometConf.isExprAllowIncompat(getExprConfigName(expr)) && 
nativeSupported(expr)) {
+      val ignoreNullFields = SQLConf.get.jsonGeneratorIgnoreNullFields
+      exprToProtoInternal(expr.child, inputs, binding) match {
+        case Some(p) =>
+          val toJson = ExprOuterClass.ToJson
             .newBuilder()
-            .setToJson(toJson)
-            .build())
-      case _ =>
-        withFallbackReason(expr, expr.child)
-        None
+            .setChild(p)
+            .setTimezone(expr.timeZoneId.getOrElse("UTC"))
+            .setIgnoreNullFields(ignoreNullFields)
+            .build()
+          Some(
+            ExprOuterClass.Expr
+              .newBuilder()
+              .setToJson(toJson)
+              .build())
+        case _ =>
+          withFallbackReason(expr, expr.child)
+          None
+      }
+    } else {
+      super.convert(expr, inputs, binding)
     }
-  }
 
   def isSupportedType(dt: DataType): Boolean = {
     dt match {
@@ -170,43 +174,24 @@ object CometStructsToJson extends 
CometExpressionSerde[StructsToJson] {
   }
 }
 
-object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] {
-
-  override def getIncompatibleReasons(): Seq[String] = Seq(
-    "Partially implemented and not comprehensively tested")
-
-  override def getUnsupportedReasons(): Seq[String] = Seq("Requires an 
explicit schema")
+/**
+ * `from_json` runs Spark's own implementation through the codegen dispatcher 
by default. The
+ * native (rust) path is partially implemented and not comprehensively tested, 
so it is opt-in via
+ * `spark.comet.expression.JsonToStructs.allowIncompatible` and only for 
schemas it supports; any
+ * other case falls through to the codegen dispatcher via 
[[CometCodegenDispatch]].
+ */
+object CometJsonToStructs extends CometCodegenDispatch[JsonToStructs] {
 
-  override def getSupportLevel(expr: JsonToStructs): SupportLevel = {
-    // this feature is partially implemented and not comprehensively tested yet
-    Incompatible()
-  }
+  private def nativeSupported(expr: JsonToStructs): Boolean =
+    expr.schema != null && isSupportedSchema(expr.schema)
 
   override def convert(
       expr: JsonToStructs,
       inputs: Seq[Attribute],
       binding: Boolean): Option[ExprOuterClass.Expr] = {
 
-    if (expr.schema == null) {
-      withFallbackReason(expr, "from_json requires explicit schema")
-      return None
-    }
-
-    def isSupportedType(dt: DataType): Boolean = {
-      dt match {
-        case StructType(fields) =>
-          fields.nonEmpty && fields.forall(f => isSupportedType(f.dataType))
-        case DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
-            DataTypes.DoubleType | DataTypes.BooleanType | 
DataTypes.StringType =>
-          true
-        case _ => false
-      }
-    }
-
-    val schemaType = expr.schema
-    if (!isSupportedType(schemaType)) {
-      withFallbackReason(expr, "from_json: Unsupported schema type")
-      return None
+    if (!(CometConf.isExprAllowIncompat(getExprConfigName(expr)) && 
nativeSupported(expr))) {
+      return super.convert(expr, inputs, binding)
     }
 
     val options = expr.options
@@ -228,7 +213,7 @@ object CometJsonToStructs extends 
CometExpressionSerde[JsonToStructs] {
     // Convert child expression and schema to protobuf
     for {
       childProto <- exprToProtoInternal(expr.child, inputs, binding)
-      schemaProto <- serializeDataType(schemaType)
+      schemaProto <- serializeDataType(expr.schema)
     } yield {
       val fromJson = ExprOuterClass.FromJson
         .newBuilder()
@@ -239,6 +224,15 @@ object CometJsonToStructs extends 
CometExpressionSerde[JsonToStructs] {
       ExprOuterClass.Expr.newBuilder().setFromJson(fromJson).build()
     }
   }
+
+  private def isSupportedSchema(dt: DataType): Boolean = dt match {
+    case StructType(fields) =>
+      fields.nonEmpty && fields.forall(f => isSupportedSchema(f.dataType))
+    case DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | 
DataTypes.DoubleType |
+        DataTypes.BooleanType | DataTypes.StringType =>
+      true
+    case _ => false
+  }
 }
 
 object CometStructsToCsv extends CometExpressionSerde[StructsToCsv] {
diff --git 
a/spark/src/test/resources/sql-tests/expressions/json/json_array_length_fallback_reasons.sql
 
b/spark/src/test/resources/sql-tests/expressions/json/json_array_length_default.sql
similarity index 51%
rename from 
spark/src/test/resources/sql-tests/expressions/json/json_array_length_fallback_reasons.sql
rename to 
spark/src/test/resources/sql-tests/expressions/json/json_array_length_default.sql
index b1775368c3..0471bf8471 100644
--- 
a/spark/src/test/resources/sql-tests/expressions/json/json_array_length_fallback_reasons.sql
+++ 
b/spark/src/test/resources/sql-tests/expressions/json/json_array_length_default.sql
@@ -15,7 +15,21 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
+-- Default engine: json_array_length runs Spark's own implementation through 
the codegen
+-- dispatcher, so it stays in the Comet pipeline while matching Spark 
byte-for-byte, including for
+-- inputs the native (rust) path cannot handle compatibly. The native path is 
opt-in via
+-- spark.comet.expression.LengthOfJsonArray.allowIncompatible=true (see 
json_array_length.sql).
+
 -- Config: spark.comet.expression.LengthOfJsonArray.allowIncompatible=false
 
-query expect_fallback(Spark's lenient JSON parser allows single quotes, 
unescaped controls, and trailing content, while Comet's serde_json requires 
strict JSON.)
-SELECT json_array_length("[{'key':'value'}]")
\ No newline at end of file
+-- single-quoted JSON: the native serde_json path is incompatible here, but 
Spark's lenient parser
+-- (run via the dispatcher) accepts it, so the default path matches Spark.
+query
+SELECT json_array_length("[{'key':'value'}]")
+
+-- trailing content: likewise handled compatibly by the default path.
+query
+SELECT json_array_length('[1,2,3] trailing')
+
+query
+SELECT json_array_length('[1,2,3,4]'), json_array_length('not an array'), 
json_array_length(NULL)
diff --git 
a/spark/src/test/resources/sql-tests/expressions/string/get_json_object.sql 
b/spark/src/test/resources/sql-tests/expressions/string/get_json_object.sql
index edbebbdb77..ab21d4d375 100644
--- a/spark/src/test/resources/sql-tests/expressions/string/get_json_object.sql
+++ b/spark/src/test/resources/sql-tests/expressions/string/get_json_object.sql
@@ -15,6 +15,8 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
+-- Opt in to the native (rust) get_json_object path. Without this, 
get_json_object runs through the
+-- codegen dispatcher (Spark's own implementation) instead.
 -- Config: spark.comet.expression.GetJsonObject.allowIncompatible=true
 -- ConfigMatrix: parquet.enable.dictionary=false,true
 
diff --git 
a/spark/src/test/resources/sql-tests/expressions/struct/structs_to_json.sql 
b/spark/src/test/resources/sql-tests/expressions/struct/structs_to_json.sql
index 3cfd636d25..b6520d61f8 100644
--- a/spark/src/test/resources/sql-tests/expressions/struct/structs_to_json.sql
+++ b/spark/src/test/resources/sql-tests/expressions/struct/structs_to_json.sql
@@ -15,23 +15,82 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
+-- Opt in to the native (rust) to_json path. Without this, to_json runs 
through the codegen
+-- dispatcher (Spark's own implementation) instead.
+-- Config: spark.comet.expression.StructsToJson.allowIncompatible=true
+-- ignoreNullFields changes whether null struct fields are emitted, so 
exercise both values.
 -- ConfigMatrix: spark.sql.jsonGenerator.ignoreNullFields=false,true
 
 statement
-CREATE TABLE test_to_json(a int, b string, f float, d double) USING parquet
+CREATE TABLE test_to_json(
+  i int, s string, f float, d double, bl boolean, bt tinyint, sh smallint, lng 
bigint)
+USING parquet
 
 statement
-INSERT INTO test_to_json VALUES (1, 'hello', cast('NaN' as float), 
cast('Infinity' as double)), (NULL, NULL, cast('NaN' as float), cast('Infinity' 
as double)), (NULL, NULL, NULL, NULL), (0, '', 0.0, 0.0), (2, 'hello', 
cast('NaN' as float), cast('-Infinity' as double))
+INSERT INTO test_to_json VALUES
+  (1, 'hello', cast('NaN' as float), cast('Infinity' as double), true, 
cast(127 as tinyint), cast(32767 as smallint), 9223372036854775807),
+  (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL),
+  (0, '', cast(0.0 as float), 0.0, false, cast(0 as tinyint), cast(0 as 
smallint), 0),
+  (-2147483648, 'say "hi"', cast('-Infinity' as float), cast('-Infinity' as 
double), true, cast(-128 as tinyint), cast(-32768 as smallint), 
-9223372036854775808),
+  (2147483647, 'cafΓ© δΈ­ζ–‡', cast('NaN' as float), cast('NaN' as double), false, 
cast(-1 as tinyint), cast(-1 as smallint), -1)
 
+-- every natively-supported type as a top-level struct field
 query
-SELECT to_json(named_struct('a', a, 'b', b, 'f', f, 'd', d)) FROM test_to_json
+SELECT to_json(named_struct(
+  'i', i, 's', s, 'f', f, 'd', d, 'bl', bl, 'bt', bt, 'sh', sh, 'lng', lng)) 
FROM test_to_json
 
--- literal arguments
+-- each type individually, column argument
 query
-SELECT to_json(named_struct('a', 1, 'b', 'hello'))
+SELECT to_json(named_struct('i', i)) FROM test_to_json
 
-query expect_fallback(StructsToJson with options is not supported)
-SELECT to_json(named_struct('a', a, 'b', b), map('timestampFormat', 
'dd/MM/yyyy')) FROM test_to_json
+query
+SELECT to_json(named_struct('s', s)) FROM test_to_json
+
+query
+SELECT to_json(named_struct('f', f)) FROM test_to_json
+
+query
+SELECT to_json(named_struct('d', d)) FROM test_to_json
+
+query
+SELECT to_json(named_struct('bl', bl)) FROM test_to_json
+
+query
+SELECT to_json(named_struct('bt', bt)) FROM test_to_json
+
+query
+SELECT to_json(named_struct('sh', sh)) FROM test_to_json
+
+query
+SELECT to_json(named_struct('lng', lng)) FROM test_to_json
+
+-- nested struct of supported types
+query
+SELECT to_json(named_struct(
+  'inner', named_struct('i', i, 's', s, 'bl', bl),
+  'lng', lng)) FROM test_to_json
+
+-- deeply nested struct
+query
+SELECT to_json(named_struct(
+  'a', named_struct('b', named_struct('c', i, 'd', s)))) FROM test_to_json
+
+-- literal arguments covering each type
+query
+SELECT to_json(named_struct(
+  'i', 1, 's', 'hello', 'f', cast(1.5 as float), 'd', 2.5,
+  'bl', true, 'bt', cast(7 as tinyint), 'sh', cast(700 as smallint), 'lng', 
9000000000))
+
+-- literal struct with a value that requires JSON string escaping
+query
+SELECT to_json(named_struct('s', 'quote " and slash \\ and tab\there'))
 
-query expect_fallback(Struct type: 
StructType(StructField(a,IntegerType,true),StructField(b,ArrayType(StringType,true),false))
 contains unsupported types)
-SELECT to_json(named_struct('a', a, 'b', array(b))) FROM test_to_json
+-- to_json with options is not supported by the native (rust) path, so it 
routes to the codegen
+-- dispatcher, which runs Spark's own implementation inside Comet (still 
native Comet execution).
+query
+SELECT to_json(named_struct('i', i, 's', s), map('timestampFormat', 
'dd/MM/yyyy')) FROM test_to_json
+
+-- to_json with an array field is not supported by the native (rust) path, so 
it likewise routes to
+-- the codegen dispatcher.
+query
+SELECT to_json(named_struct('i', i, 'arr', array(s))) FROM test_to_json
diff --git 
a/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala
index b9c5175117..9416861da4 100644
--- a/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala
@@ -166,13 +166,16 @@ class CometCodegenSourceSuite extends AnyFunSuite {
         s"got:\n$src")
   }
 
-  test("canHandle rejects CodegenFallback expressions") {
+  test("canHandle accepts CodegenFallback expressions (delegates to 
eval(row))") {
+    // CodegenFallback.doGenCode emits ((Expression) references[N]).eval(row) 
which is the same
+    // mechanism that backs HigherOrderFunction support: the eval reads 
through the kernel's typed
+    // Arrow getters via the row alias. Other CodegenFallback expressions 
(JsonToStructs,
+    // StructsToJson, ...) ride the same path.
     val expr = FakeCodegenFallback(BoundReference(0, StringType, nullable = 
true))
     val reason = CometBatchKernelCodegen.canHandle(expr)
-    assert(reason.isDefined, "expected canHandle to reject CodegenFallback")
     assert(
-      reason.get.contains("FakeCodegenFallback"),
-      s"expected reason to name the rejected expression class; got: 
${reason.get}")
+      reason.isEmpty,
+      s"expected canHandle to accept CodegenFallback; got rejection: 
${reason.getOrElse("")}")
   }
 
   test("canHandle accepts Nondeterministic expressions (per-partition kernel 
handles state)") {
diff --git 
a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala
index c3446f6264..2356a985ef 100644
--- a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala
@@ -39,6 +39,8 @@ class CometJsonExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelpe
       pos: Position): Unit = {
     super.test(testName, testTags: _*) {
       withSQLConf(
+        // This suite exercises the native (rust) JSON path, which is opt-in 
per expression via
+        // `allowIncompatible`. By default these expressions run through the 
codegen dispatcher.
         CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> 
"true",
         CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> 
"true") {
         testFun
@@ -70,16 +72,16 @@ class CometJsonExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelpe
     }
   }
 
-  test("to_json - fallback reasons") {
+  test("to_json - options and unsupported native types route to the codegen 
engine") {
     withTable("t") {
       sql("CREATE TABLE t(a INT, b STRING) USING parquet")
       sql("INSERT INTO t VALUES (1, 'hello')")
-      checkSparkAnswerAndFallbackReason(
-        "SELECT to_json(named_struct('a', a, 'b', b), map('timestampFormat', 
'dd/MM/yyyy')) FROM t",
-        "StructsToJson with options is not supported")
-      checkSparkAnswerAndFallbackReason(
-        "SELECT to_json(named_struct('b', array(b))) FROM t",
-        "Struct type: 
StructType(StructField(b,ArrayType(StringType,true),false)) contains 
unsupported types")
+      // The native (rust) path does not support to_json with options or 
array/map types. Even with
+      // allowIncompatible enabled, these cases fall back to the codegen 
dispatcher, which runs
+      // Spark's own implementation inside the Comet pipeline rather than 
falling back to Spark.
+      checkSparkAnswerAndOperator(
+        "SELECT to_json(named_struct('a', a, 'b', b), map('timestampFormat', 
'dd/MM/yyyy')) FROM t")
+      checkSparkAnswerAndOperator("SELECT to_json(named_struct('b', array(b))) 
FROM t")
     }
   }
 
diff --git a/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala
new file mode 100644
index 0000000000..4a4df2e9b1
--- /dev/null
+++ b/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.CometTestBase
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+class CometJsonJvmSuite extends CometTestBase with AdaptiveSparkPlanHelper {
+
+  // No per-expression `allowIncompatible` is set, so all JSON expressions run 
through the
+  // codegen dispatcher (Spark's own code) rather than the native rust path.
+  override protected def sparkConf: SparkConf =
+    super.sparkConf
+      .set(CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key, "true")
+
+  private val rows = Seq(
+    """{"a":1,"b":"x","arr":[1,2,3]}""",
+    """{"a":2,"b":"y","arr":[]}""",
+    """{"a":null,"b":"z","arr":[10]}""",
+    null,
+    """not json""")
+
+  private def withJsonTable(f: => Unit): Unit = {
+    withTable("t") {
+      sql("CREATE TABLE t (j STRING) USING parquet")
+      val sqlRows = rows
+        .map(v => if (v == null) "(NULL)" else s"('${v.replace("'", "''")}')")
+        .mkString(", ")
+      sql(s"INSERT INTO t VALUES $sqlRows")
+      f
+    }
+  }
+
+  test("get_json_object via JVM engine") {
+    withJsonTable {
+      checkSparkAnswerAndOperator(sql("SELECT get_json_object(j, '$.a') FROM 
t"))
+      checkSparkAnswerAndOperator(sql("SELECT get_json_object(j, '$.b') FROM 
t"))
+    }
+  }
+
+  test("from_json with explicit schema via JVM engine") {
+    withJsonTable {
+      checkSparkAnswerAndOperator(sql("SELECT from_json(j, 'a INT, b STRING') 
FROM t"))
+    }
+  }
+
+  test("to_json round-trip via JVM engine") {
+    withJsonTable {
+      checkSparkAnswerAndOperator(sql("SELECT to_json(from_json(j, 'a INT, b 
STRING')) FROM t"))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to