This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new fd53edb2f feat: Add partial support for `from_json` (#2934)
fd53edb2f is described below

commit fd53edb2f00c5a25415fef76da3c47e00ec3c6a7
Author: Andy Grove <[email protected]>
AuthorDate: Sat Dec 20 08:38:43 2025 -0700

    feat: Add partial support for `from_json` (#2934)
---
 .github/workflows/pr_build_linux.yml               |   1 +
 .github/workflows/pr_build_macos.yml               |   1 +
 docs/source/user-guide/latest/configs.md           |   1 +
 native/Cargo.lock                                  |   1 +
 native/core/src/execution/expressions/strings.rs   |  27 +-
 .../src/execution/planner/expression_registry.rs   |   4 +
 native/proto/src/proto/expr.proto                  |   7 +
 native/spark-expr/Cargo.toml                       |   1 +
 native/spark-expr/src/json_funcs/from_json.rs      | 639 +++++++++++++++++++++
 native/spark-expr/src/json_funcs/mod.rs            |   2 +
 native/spark-expr/src/lib.rs                       |   2 +-
 .../org/apache/comet/serde/QueryPlanSerde.scala    |   1 +
 .../scala/org/apache/comet/serde/structs.scala     |  68 ++-
 .../apache/comet/CometJsonExpressionSuite.scala    | 164 ++++++
 .../benchmark/CometJsonExpressionBenchmark.scala   | 183 ++++++
 15 files changed, 1098 insertions(+), 4 deletions(-)

diff --git a/.github/workflows/pr_build_linux.yml 
b/.github/workflows/pr_build_linux.yml
index e7651ec5f..e3b0e4056 100644
--- a/.github/workflows/pr_build_linux.yml
+++ b/.github/workflows/pr_build_linux.yml
@@ -161,6 +161,7 @@ jobs:
               org.apache.comet.CometStringExpressionSuite
               org.apache.comet.CometBitwiseExpressionSuite
               org.apache.comet.CometMapExpressionSuite
+              org.apache.comet.CometJsonExpressionSuite
               org.apache.comet.expressions.conditional.CometIfSuite
               org.apache.comet.expressions.conditional.CometCoalesceSuite
               org.apache.comet.expressions.conditional.CometCaseWhenSuite
diff --git a/.github/workflows/pr_build_macos.yml 
b/.github/workflows/pr_build_macos.yml
index 8fd0aab78..02d31c4f5 100644
--- a/.github/workflows/pr_build_macos.yml
+++ b/.github/workflows/pr_build_macos.yml
@@ -126,6 +126,7 @@ jobs:
               org.apache.comet.CometStringExpressionSuite
               org.apache.comet.CometBitwiseExpressionSuite
               org.apache.comet.CometMapExpressionSuite
+              org.apache.comet.CometJsonExpressionSuite
               org.apache.comet.expressions.conditional.CometIfSuite
               org.apache.comet.expressions.conditional.CometCoalesceSuite
               org.apache.comet.expressions.conditional.CometCaseWhenSuite
diff --git a/docs/source/user-guide/latest/configs.md 
b/docs/source/user-guide/latest/configs.md
index db7d2ce32..13a9c752e 100644
--- a/docs/source/user-guide/latest/configs.md
+++ b/docs/source/user-guide/latest/configs.md
@@ -264,6 +264,7 @@ These settings can be used to determine which parts of the 
plan are accelerated
 | `spark.comet.expression.IsNaN.enabled` | Enable Comet acceleration for 
`IsNaN` | true |
 | `spark.comet.expression.IsNotNull.enabled` | Enable Comet acceleration for 
`IsNotNull` | true |
 | `spark.comet.expression.IsNull.enabled` | Enable Comet acceleration for 
`IsNull` | true |
+| `spark.comet.expression.JsonToStructs.enabled` | Enable Comet acceleration 
for `JsonToStructs` | true |
 | `spark.comet.expression.KnownFloatingPointNormalized.enabled` | Enable Comet 
acceleration for `KnownFloatingPointNormalized` | true |
 | `spark.comet.expression.Length.enabled` | Enable Comet acceleration for 
`Length` | true |
 | `spark.comet.expression.LessThan.enabled` | Enable Comet acceleration for 
`LessThan` | true |
diff --git a/native/Cargo.lock b/native/Cargo.lock
index acdd27976..c28be6c54 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -1882,6 +1882,7 @@ dependencies = [
  "num",
  "rand 0.9.2",
  "regex",
+ "serde_json",
  "thiserror 2.0.17",
  "tokio",
  "twox-hash",
diff --git a/native/core/src/execution/expressions/strings.rs 
b/native/core/src/execution/expressions/strings.rs
index 5f4300eb1..721939596 100644
--- a/native/core/src/execution/expressions/strings.rs
+++ b/native/core/src/execution/expressions/strings.rs
@@ -25,12 +25,13 @@ use datafusion::common::ScalarValue;
 use datafusion::physical_expr::expressions::{LikeExpr, Literal};
 use datafusion::physical_expr::PhysicalExpr;
 use datafusion_comet_proto::spark_expression::Expr;
-use datafusion_comet_spark_expr::{RLike, SubstringExpr};
+use datafusion_comet_spark_expr::{FromJson, RLike, SubstringExpr};
 
 use crate::execution::{
     expressions::extract_expr,
     operators::ExecutionError,
     planner::{expression_registry::ExpressionBuilder, PhysicalPlanner},
+    serde::to_arrow_datatype,
 };
 
 /// Builder for Substring expressions
@@ -98,3 +99,27 @@ impl ExpressionBuilder for RlikeBuilder {
         }
     }
 }
+
+pub struct FromJsonBuilder;
+
+impl ExpressionBuilder for FromJsonBuilder {
+    fn build(
+        &self,
+        spark_expr: &Expr,
+        input_schema: SchemaRef,
+        planner: &PhysicalPlanner,
+    ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+        let expr = extract_expr!(spark_expr, FromJson);
+        let child = planner.create_expr(
+            expr.child.as_ref().ok_or_else(|| {
+                ExecutionError::GeneralError("FromJson missing 
child".to_string())
+            })?,
+            input_schema,
+        )?;
+        let schema =
+            to_arrow_datatype(expr.schema.as_ref().ok_or_else(|| {
+                ExecutionError::GeneralError("FromJson missing 
schema".to_string())
+            })?);
+        Ok(Arc::new(FromJson::new(child, schema, &expr.timezone)))
+    }
+}
diff --git a/native/core/src/execution/planner/expression_registry.rs 
b/native/core/src/execution/planner/expression_registry.rs
index 3321f6118..e85fbe510 100644
--- a/native/core/src/execution/planner/expression_registry.rs
+++ b/native/core/src/execution/planner/expression_registry.rs
@@ -94,6 +94,7 @@ pub enum ExpressionType {
     CreateNamedStruct,
     GetStructField,
     ToJson,
+    FromJson,
     ToPrettyString,
     ListExtract,
     GetArrayStructFields,
@@ -281,6 +282,8 @@ impl ExpressionRegistry {
             .insert(ExpressionType::Like, Box::new(LikeBuilder));
         self.builders
             .insert(ExpressionType::Rlike, Box::new(RlikeBuilder));
+        self.builders
+            .insert(ExpressionType::FromJson, Box::new(FromJsonBuilder));
     }
 
     /// Extract expression type from Spark protobuf expression
@@ -336,6 +339,7 @@ impl ExpressionRegistry {
             Some(ExprStruct::CreateNamedStruct(_)) => 
Ok(ExpressionType::CreateNamedStruct),
             Some(ExprStruct::GetStructField(_)) => 
Ok(ExpressionType::GetStructField),
             Some(ExprStruct::ToJson(_)) => Ok(ExpressionType::ToJson),
+            Some(ExprStruct::FromJson(_)) => Ok(ExpressionType::FromJson),
             Some(ExprStruct::ToPrettyString(_)) => 
Ok(ExpressionType::ToPrettyString),
             Some(ExprStruct::ListExtract(_)) => 
Ok(ExpressionType::ListExtract),
             Some(ExprStruct::GetArrayStructFields(_)) => 
Ok(ExpressionType::GetArrayStructFields),
diff --git a/native/proto/src/proto/expr.proto 
b/native/proto/src/proto/expr.proto
index a7736f561..1c453b633 100644
--- a/native/proto/src/proto/expr.proto
+++ b/native/proto/src/proto/expr.proto
@@ -85,6 +85,7 @@ message Expr {
     Rand randn = 62;
     EmptyExpr spark_partition_id = 63;
     EmptyExpr monotonically_increasing_id = 64;
+    FromJson from_json = 89;
   }
 }
 
@@ -268,6 +269,12 @@ message ToJson {
   bool ignore_null_fields = 6;
 }
 
+message FromJson {
+  Expr child = 1;
+  DataType schema = 2;
+  string timezone = 3;
+}
+
 enum BinaryOutputStyle {
   UTF8 = 0;
   BASIC = 1;
diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml
index b3a46fd91..c973a5b37 100644
--- a/native/spark-expr/Cargo.toml
+++ b/native/spark-expr/Cargo.toml
@@ -33,6 +33,7 @@ datafusion = { workspace = true }
 chrono-tz = { workspace = true }
 num = { workspace = true }
 regex = { workspace = true }
+serde_json = "1.0"
 thiserror = { workspace = true }
 futures = { workspace = true }
 twox-hash = "2.1.2"
diff --git a/native/spark-expr/src/json_funcs/from_json.rs 
b/native/spark-expr/src/json_funcs/from_json.rs
new file mode 100644
index 000000000..ebcc84b8f
--- /dev/null
+++ b/native/spark-expr/src/json_funcs/from_json.rs
@@ -0,0 +1,639 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+    Array, ArrayRef, BooleanBuilder, Float32Builder, Float64Builder, 
Int32Builder, Int64Builder,
+    RecordBatch, StringBuilder, StructArray,
+};
+use arrow::datatypes::{DataType, Field, Schema};
+use datafusion::common::Result;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ColumnarValue;
+use std::any::Any;
+use std::fmt::{Debug, Display, Formatter};
+use std::sync::Arc;
+
+/// from_json function - parses JSON strings into structured types
+#[derive(Debug, Eq)]
+pub struct FromJson {
+    /// The JSON string input expression
+    expr: Arc<dyn PhysicalExpr>,
+    /// Target schema for parsing
+    schema: DataType,
+    /// Timezone for timestamp parsing (future use)
+    timezone: String,
+}
+
+impl PartialEq for FromJson {
+    fn eq(&self, other: &Self) -> bool {
+        self.expr.eq(&other.expr) && self.schema == other.schema && 
self.timezone == other.timezone
+    }
+}
+
+impl std::hash::Hash for FromJson {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.expr.hash(state);
+        // Note: DataType doesn't implement Hash, so we hash its debug 
representation
+        format!("{:?}", self.schema).hash(state);
+        self.timezone.hash(state);
+    }
+}
+
+impl FromJson {
+    pub fn new(expr: Arc<dyn PhysicalExpr>, schema: DataType, timezone: &str) 
-> Self {
+        Self {
+            expr,
+            schema,
+            timezone: timezone.to_owned(),
+        }
+    }
+}
+
+impl Display for FromJson {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "from_json({}, schema={:?}, timezone={})",
+            self.expr, self.schema, self.timezone
+        )
+    }
+}
+
+impl PartialEq<dyn Any> for FromJson {
+    fn eq(&self, other: &dyn Any) -> bool {
+        if let Some(other) = other.downcast_ref::<FromJson>() {
+            self.expr.eq(&other.expr)
+                && self.schema == other.schema
+                && self.timezone == other.timezone
+        } else {
+            false
+        }
+    }
+}
+
+impl PhysicalExpr for FromJson {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
+        unimplemented!()
+    }
+
+    fn data_type(&self, _: &Schema) -> Result<DataType> {
+        Ok(self.schema.clone())
+    }
+
+    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+        // Always nullable - parse errors return null in PERMISSIVE mode
+        Ok(true)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let input = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
+        Ok(ColumnarValue::Array(json_string_to_struct(
+            &input,
+            &self.schema,
+        )?))
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        vec![&self.expr]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        assert!(children.len() == 1);
+        Ok(Arc::new(Self::new(
+            Arc::clone(&children[0]),
+            self.schema.clone(),
+            &self.timezone,
+        )))
+    }
+}
+
+/// Parse JSON string array into struct array
+fn json_string_to_struct(arr: &Arc<dyn Array>, schema: &DataType) -> 
Result<ArrayRef> {
+    use arrow::array::StringArray;
+    use arrow::buffer::NullBuffer;
+
+    let string_array = 
arr.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
+        datafusion::common::DataFusionError::Execution("from_json expects 
string input".to_string())
+    })?;
+
+    let DataType::Struct(fields) = schema else {
+        return Err(datafusion::common::DataFusionError::Execution(
+            "from_json requires struct schema".to_string(),
+        ));
+    };
+
+    let num_rows = string_array.len();
+    let mut field_builders = create_field_builders(fields, num_rows)?;
+    let mut struct_nulls = vec![true; num_rows];
+    for (row_idx, struct_null) in struct_nulls.iter_mut().enumerate() {
+        if string_array.is_null(row_idx) {
+            // Null input -> null struct
+            *struct_null = false;
+            append_null_to_all_builders(&mut field_builders);
+        } else {
+            let json_str = string_array.value(row_idx);
+
+            // Parse JSON (PERMISSIVE mode: return null fields on error)
+            match serde_json::from_str::<serde_json::Value>(json_str) {
+                Ok(json_value) => {
+                    if let serde_json::Value::Object(obj) = json_value {
+                        // Struct is not null, extract each field
+                        *struct_null = true;
+                        for (field, builder) in 
fields.iter().zip(field_builders.iter_mut()) {
+                            let field_value = obj.get(field.name());
+                            append_field_value(builder, field, field_value)?;
+                        }
+                    } else {
+                        // Not an object -> struct with null fields
+                        *struct_null = true;
+                        append_null_to_all_builders(&mut field_builders);
+                    }
+                }
+                Err(_) => {
+                    // Parse error -> struct with null fields (PERMISSIVE mode)
+                    *struct_null = true;
+                    append_null_to_all_builders(&mut field_builders);
+                }
+            }
+        }
+    }
+
+    let arrays: Vec<ArrayRef> = field_builders
+        .into_iter()
+        .map(finish_builder)
+        .collect::<Result<Vec<_>>>()?;
+    let null_buffer = NullBuffer::from(struct_nulls);
+    Ok(Arc::new(StructArray::new(
+        fields.clone(),
+        arrays,
+        Some(null_buffer),
+    )))
+}
+
+/// Builder enum for different data types
+enum FieldBuilder {
+    Int32(Int32Builder),
+    Int64(Int64Builder),
+    Float32(Float32Builder),
+    Float64(Float64Builder),
+    Boolean(BooleanBuilder),
+    String(StringBuilder),
+    Struct {
+        fields: arrow::datatypes::Fields,
+        builders: Vec<FieldBuilder>,
+        null_buffer: Vec<bool>,
+    },
+}
+
+fn create_field_builders(
+    fields: &arrow::datatypes::Fields,
+    capacity: usize,
+) -> Result<Vec<FieldBuilder>> {
+    fields
+        .iter()
+        .map(|field| match field.data_type() {
+            DataType::Int32 => 
Ok(FieldBuilder::Int32(Int32Builder::with_capacity(capacity))),
+            DataType::Int64 => 
Ok(FieldBuilder::Int64(Int64Builder::with_capacity(capacity))),
+            DataType::Float32 => 
Ok(FieldBuilder::Float32(Float32Builder::with_capacity(
+                capacity,
+            ))),
+            DataType::Float64 => 
Ok(FieldBuilder::Float64(Float64Builder::with_capacity(
+                capacity,
+            ))),
+            DataType::Boolean => 
Ok(FieldBuilder::Boolean(BooleanBuilder::with_capacity(
+                capacity,
+            ))),
+            DataType::Utf8 => 
Ok(FieldBuilder::String(StringBuilder::with_capacity(
+                capacity,
+                capacity * 16,
+            ))),
+            DataType::Struct(nested_fields) => {
+                let nested_builders = create_field_builders(nested_fields, 
capacity)?;
+                Ok(FieldBuilder::Struct {
+                    fields: nested_fields.clone(),
+                    builders: nested_builders,
+                    null_buffer: Vec::with_capacity(capacity),
+                })
+            }
+            dt => Err(datafusion::common::DataFusionError::Execution(format!(
+                "Unsupported field type in from_json: {:?}",
+                dt
+            ))),
+        })
+        .collect()
+}
+
+fn append_null_to_all_builders(builders: &mut [FieldBuilder]) {
+    for builder in builders {
+        match builder {
+            FieldBuilder::Int32(b) => b.append_null(),
+            FieldBuilder::Int64(b) => b.append_null(),
+            FieldBuilder::Float32(b) => b.append_null(),
+            FieldBuilder::Float64(b) => b.append_null(),
+            FieldBuilder::Boolean(b) => b.append_null(),
+            FieldBuilder::String(b) => b.append_null(),
+            FieldBuilder::Struct {
+                builders: nested_builders,
+                null_buffer,
+                ..
+            } => {
+                // Append null to nested struct
+                null_buffer.push(false);
+                append_null_to_all_builders(nested_builders);
+            }
+        }
+    }
+}
+
+fn append_field_value(
+    builder: &mut FieldBuilder,
+    field: &Field,
+    json_value: Option<&serde_json::Value>,
+) -> Result<()> {
+    use serde_json::Value;
+
+    let value = match json_value {
+        Some(Value::Null) | None => {
+            // Missing field or explicit null -> append null
+            match builder {
+                FieldBuilder::Int32(b) => b.append_null(),
+                FieldBuilder::Int64(b) => b.append_null(),
+                FieldBuilder::Float32(b) => b.append_null(),
+                FieldBuilder::Float64(b) => b.append_null(),
+                FieldBuilder::Boolean(b) => b.append_null(),
+                FieldBuilder::String(b) => b.append_null(),
+                FieldBuilder::Struct {
+                    builders: nested_builders,
+                    null_buffer,
+                    ..
+                } => {
+                    null_buffer.push(false);
+                    append_null_to_all_builders(nested_builders);
+                }
+            }
+            return Ok(());
+        }
+        Some(v) => v,
+    };
+
+    match (builder, field.data_type()) {
+        (FieldBuilder::Int32(b), DataType::Int32) => {
+            if let Some(i) = value.as_i64() {
+                if i >= i32::MIN as i64 && i <= i32::MAX as i64 {
+                    b.append_value(i as i32);
+                } else {
+                    b.append_null(); // Overflow
+                }
+            } else {
+                b.append_null(); // Type mismatch
+            }
+        }
+        (FieldBuilder::Int64(b), DataType::Int64) => {
+            if let Some(i) = value.as_i64() {
+                b.append_value(i);
+            } else {
+                b.append_null();
+            }
+        }
+        (FieldBuilder::Float32(b), DataType::Float32) => {
+            if let Some(f) = value.as_f64() {
+                b.append_value(f as f32);
+            } else {
+                b.append_null();
+            }
+        }
+        (FieldBuilder::Float64(b), DataType::Float64) => {
+            if let Some(f) = value.as_f64() {
+                b.append_value(f);
+            } else {
+                b.append_null();
+            }
+        }
+        (FieldBuilder::Boolean(b), DataType::Boolean) => {
+            if let Some(bool_val) = value.as_bool() {
+                b.append_value(bool_val);
+            } else {
+                b.append_null();
+            }
+        }
+        (FieldBuilder::String(b), DataType::Utf8) => {
+            if let Some(s) = value.as_str() {
+                b.append_value(s);
+            } else {
+                // Stringify non-string values
+                b.append_value(value.to_string());
+            }
+        }
+        (
+            FieldBuilder::Struct {
+                fields: nested_fields,
+                builders: nested_builders,
+                null_buffer,
+            },
+            DataType::Struct(_),
+        ) => {
+            // Handle nested struct
+            if let Some(obj) = value.as_object() {
+                // Non-null nested struct
+                null_buffer.push(true);
+                for (nested_field, nested_builder) in
+                    nested_fields.iter().zip(nested_builders.iter_mut())
+                {
+                    let nested_value = obj.get(nested_field.name());
+                    append_field_value(nested_builder, nested_field, 
nested_value)?;
+                }
+            } else {
+                // Not an object -> null nested struct
+                null_buffer.push(false);
+                append_null_to_all_builders(nested_builders);
+            }
+        }
+        _ => {
+            return Err(datafusion::common::DataFusionError::Execution(
+                "Type mismatch in from_json".to_string(),
+            ));
+        }
+    }
+
+    Ok(())
+}
+
+fn finish_builder(builder: FieldBuilder) -> Result<ArrayRef> {
+    Ok(match builder {
+        FieldBuilder::Int32(mut b) => Arc::new(b.finish()),
+        FieldBuilder::Int64(mut b) => Arc::new(b.finish()),
+        FieldBuilder::Float32(mut b) => Arc::new(b.finish()),
+        FieldBuilder::Float64(mut b) => Arc::new(b.finish()),
+        FieldBuilder::Boolean(mut b) => Arc::new(b.finish()),
+        FieldBuilder::String(mut b) => Arc::new(b.finish()),
+        FieldBuilder::Struct {
+            fields,
+            builders,
+            null_buffer,
+        } => {
+            let nested_arrays: Vec<ArrayRef> = builders
+                .into_iter()
+                .map(finish_builder)
+                .collect::<Result<Vec<_>>>()?;
+            let null_buf = arrow::buffer::NullBuffer::from(null_buffer);
+            Arc::new(StructArray::new(fields, nested_arrays, Some(null_buf)))
+        }
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{Int32Array, StringArray};
+    use arrow::datatypes::Fields;
+
+    #[test]
+    fn test_simple_struct() -> Result<()> {
+        let schema = DataType::Struct(Fields::from(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Utf8, true),
+        ]));
+
+        let input: Arc<dyn Array> = Arc::new(StringArray::from(vec![
+            Some(r#"{"a": 123, "b": "hello"}"#),
+            Some(r#"{"a": 456}"#),
+            Some(r#"invalid json"#),
+            None,
+        ]));
+
+        let result = json_string_to_struct(&input, &schema)?;
+        let struct_array = 
result.as_any().downcast_ref::<StructArray>().unwrap();
+
+        assert_eq!(struct_array.len(), 4);
+
+        // First row
+        let a_array = struct_array
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(a_array.value(0), 123);
+        let b_array = struct_array
+            .column(1)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(b_array.value(0), "hello");
+
+        // Second row (missing field b)
+        assert_eq!(a_array.value(1), 456);
+        assert!(b_array.is_null(1));
+
+        // Third row (parse error -> struct NOT null, all fields null)
+        assert!(!struct_array.is_null(2), "Struct should not be null");
+        assert!(a_array.is_null(2));
+        assert!(b_array.is_null(2));
+
+        // Fourth row (null input -> struct IS null)
+        assert!(struct_array.is_null(3), "Struct itself should be null");
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_all_primitive_types() -> Result<()> {
+        let schema = DataType::Struct(Fields::from(vec![
+            Field::new("i32", DataType::Int32, true),
+            Field::new("i64", DataType::Int64, true),
+            Field::new("f32", DataType::Float32, true),
+            Field::new("f64", DataType::Float64, true),
+            Field::new("bool", DataType::Boolean, true),
+            Field::new("str", DataType::Utf8, true),
+        ]));
+
+        let input: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(
+            
r#"{"i32":123,"i64":9999999999,"f32":1.5,"f64":2.5,"bool":true,"str":"test"}"#,
+        )]));
+
+        let result = json_string_to_struct(&input, &schema)?;
+        let struct_array = 
result.as_any().downcast_ref::<StructArray>().unwrap();
+
+        assert_eq!(struct_array.len(), 1);
+
+        // Verify all types
+        let i32_array = struct_array
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(i32_array.value(0), 123);
+
+        let i64_array = struct_array
+            .column(1)
+            .as_any()
+            .downcast_ref::<arrow::array::Int64Array>()
+            .unwrap();
+        assert_eq!(i64_array.value(0), 9999999999);
+
+        let f32_array = struct_array
+            .column(2)
+            .as_any()
+            .downcast_ref::<arrow::array::Float32Array>()
+            .unwrap();
+        assert_eq!(f32_array.value(0), 1.5);
+
+        let f64_array = struct_array
+            .column(3)
+            .as_any()
+            .downcast_ref::<arrow::array::Float64Array>()
+            .unwrap();
+        assert_eq!(f64_array.value(0), 2.5);
+
+        let bool_array = struct_array
+            .column(4)
+            .as_any()
+            .downcast_ref::<arrow::array::BooleanArray>()
+            .unwrap();
+        assert!(bool_array.value(0));
+
+        let str_array = struct_array
+            .column(5)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(str_array.value(0), "test");
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_empty_and_null_json() -> Result<()> {
+        let schema = DataType::Struct(Fields::from(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Utf8, true),
+        ]));
+
+        let input: Arc<dyn Array> = Arc::new(StringArray::from(vec![
+            Some(r#"{}"#),   // Empty object
+            Some(r#"null"#), // JSON null
+            Some(r#"[]"#),   // Array (not object)
+            Some(r#"123"#),  // Number (not object)
+        ]));
+
+        let result = json_string_to_struct(&input, &schema)?;
+        let struct_array = 
result.as_any().downcast_ref::<StructArray>().unwrap();
+
+        assert_eq!(struct_array.len(), 4);
+
+        let a_array = struct_array
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let b_array = struct_array
+            .column(1)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+
+        // All rows should have non-null structs with null field values
+        for i in 0..4 {
+            assert!(
+                !struct_array.is_null(i),
+                "Row {} struct should not be null",
+                i
+            );
+            assert!(a_array.is_null(i), "Row {} field a should be null", i);
+            assert!(b_array.is_null(i), "Row {} field b should be null", i);
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_nested_struct() -> Result<()> {
+        let schema = DataType::Struct(Fields::from(vec![
+            Field::new(
+                "outer",
+                DataType::Struct(Fields::from(vec![
+                    Field::new("inner_a", DataType::Int32, true),
+                    Field::new("inner_b", DataType::Utf8, true),
+                ])),
+                true,
+            ),
+            Field::new("top_level", DataType::Int32, true),
+        ]));
+
+        let input: Arc<dyn Array> = Arc::new(StringArray::from(vec![
+            
Some(r#"{"outer":{"inner_a":123,"inner_b":"hello"},"top_level":999}"#),
+            Some(r#"{"outer":{"inner_a":456},"top_level":888}"#), // Missing 
nested field
+            Some(r#"{"outer":null,"top_level":777}"#),            // Null 
nested struct
+            Some(r#"{"top_level":666}"#),                         // Missing 
nested struct
+        ]));
+
+        let result = json_string_to_struct(&input, &schema)?;
+        let struct_array = 
result.as_any().downcast_ref::<StructArray>().unwrap();
+
+        assert_eq!(struct_array.len(), 4);
+
+        // Check outer struct
+        let outer_array = struct_array
+            .column(0)
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .unwrap();
+        let top_level_array = struct_array
+            .column(1)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+
+        // Row 0: Valid nested struct
+        assert!(!outer_array.is_null(0), "Nested struct should not be null");
+        let inner_a_array = outer_array
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let inner_b_array = outer_array
+            .column(1)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(inner_a_array.value(0), 123);
+        assert_eq!(inner_b_array.value(0), "hello");
+        assert_eq!(top_level_array.value(0), 999);
+
+        // Row 1: Missing nested field
+        assert!(!outer_array.is_null(1));
+        assert_eq!(inner_a_array.value(1), 456);
+        assert!(inner_b_array.is_null(1));
+        assert_eq!(top_level_array.value(1), 888);
+
+        // Row 2: Null nested struct
+        assert!(outer_array.is_null(2), "Nested struct should be null");
+        assert_eq!(top_level_array.value(2), 777);
+
+        // Row 3: Missing nested struct
+        assert!(outer_array.is_null(3), "Nested struct should be null");
+        assert_eq!(top_level_array.value(3), 666);
+
+        Ok(())
+    }
+}
diff --git a/native/spark-expr/src/json_funcs/mod.rs 
b/native/spark-expr/src/json_funcs/mod.rs
index de3037590..9f025070d 100644
--- a/native/spark-expr/src/json_funcs/mod.rs
+++ b/native/spark-expr/src/json_funcs/mod.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod from_json;
 mod to_json;
 
+pub use from_json::FromJson;
 pub use to_json::ToJson;
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index 2903061d6..96e727ae5 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -71,7 +71,7 @@ pub use comet_scalar_funcs::{
 pub use datetime_funcs::{SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, 
TimestampTruncExpr};
 pub use error::{SparkError, SparkResult};
 pub use hash_funcs::*;
-pub use json_funcs::ToJson;
+pub use json_funcs::{FromJson, ToJson};
 pub use math_funcs::{
     create_modulo_expr, create_negate_expr, spark_ceil, spark_decimal_div,
     spark_decimal_integral_div, spark_floor, spark_make_decimal, spark_round, 
spark_unhex,
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 54df2f168..83917d33f 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -131,6 +131,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
     classOf[CreateNamedStruct] -> CometCreateNamedStruct,
     classOf[GetArrayStructFields] -> CometGetArrayStructFields,
     classOf[GetStructField] -> CometGetStructField,
+    classOf[JsonToStructs] -> CometJsonToStructs,
     classOf[StructsToJson] -> CometStructsToJson)
 
   private val hashExpressions: Map[Class[_ <: Expression], 
CometExpressionSerde[_]] = Map(
diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala 
b/spark/src/main/scala/org/apache/comet/serde/structs.scala
index 208b2e126..55e031d34 100644
--- a/spark/src/main/scala/org/apache/comet/serde/structs.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala
@@ -21,11 +21,11 @@ package org.apache.comet.serde
 
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
CreateNamedStruct, GetArrayStructFields, GetStructField, StructsToJson}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
CreateNamedStruct, GetArrayStructFields, GetStructField, JsonToStructs, 
StructsToJson}
 import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, 
StructType}
 
 import org.apache.comet.CometSparkSessionExtensions.withInfo
-import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal
+import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, 
serializeDataType}
 
 object CometCreateNamedStruct extends CometExpressionSerde[CreateNamedStruct] {
   override def convert(
@@ -167,3 +167,67 @@ object CometStructsToJson extends 
CometExpressionSerde[StructsToJson] {
     }
   }
 }
+
+object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] {
+
+  override def getSupportLevel(expr: JsonToStructs): SupportLevel = {
+    // this feature is partially implemented and not comprehensively tested yet
+    Incompatible()
+  }
+
+  override def convert(
+      expr: JsonToStructs,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+
+    if (expr.schema == null) {
+      withInfo(expr, "from_json requires explicit schema")
+      return None
+    }
+
+    def isSupportedType(dt: DataType): Boolean = {
+      dt match {
+        case StructType(fields) =>
+          fields.nonEmpty && fields.forall(f => isSupportedType(f.dataType))
+        case DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
+            DataTypes.DoubleType | DataTypes.BooleanType | 
DataTypes.StringType =>
+          true
+        case _ => false
+      }
+    }
+
+    val schemaType = expr.schema
+    if (!isSupportedType(schemaType)) {
+      withInfo(expr, "from_json: Unsupported schema type")
+      return None
+    }
+
+    val options = expr.options
+    if (options.nonEmpty) {
+      val mode = options.getOrElse("mode", "PERMISSIVE")
+      if (mode != "PERMISSIVE") {
+        withInfo(expr, s"from_json: Only PERMISSIVE mode supported, got: 
$mode")
+        return None
+      }
+      val knownOptions = Set("mode")
+      val unknownOpts = options.keySet -- knownOptions
+      if (unknownOpts.nonEmpty) {
+        withInfo(expr, s"from_json: Ignoring unsupported options: 
${unknownOpts.mkString(", ")}")
+      }
+    }
+
+    // Convert child expression and schema to protobuf
+    for {
+      childProto <- exprToProtoInternal(expr.child, inputs, binding)
+      schemaProto <- serializeDataType(schemaType)
+    } yield {
+      val fromJson = ExprOuterClass.FromJson
+        .newBuilder()
+        .setChild(childProto)
+        .setSchema(schemaProto)
+        .setTimezone(expr.timeZoneId.getOrElse("UTC"))
+        .build()
+      ExprOuterClass.Expr.newBuilder().setFromJson(fromJson).build()
+    }
+  }
+}
diff --git 
a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala
new file mode 100644
index 000000000..38f576526
--- /dev/null
+++ b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import org.apache.spark.sql.CometTestBase
+import org.apache.spark.sql.catalyst.expressions.JsonToStructs
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+class CometJsonExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
+
+  override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)(implicit
+      pos: Position): Unit = {
+    super.test(testName, testTags: _*) {
+      
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> 
"true") {
+        testFun
+      }
+    }
+  }
+
+  test("from_json - basic primitives") {
+    Seq(true, false).foreach { dictionaryEnabled =>
+      withParquetTable(
+        (0 until 100).map(i => {
+          val json = s"""{"a":$i,"b":"str_$i"}"""
+          (i, json)
+        }),
+        "tbl",
+        withDictionary = dictionaryEnabled) {
+
+        val schema = "a INT, b STRING"
+        checkSparkAnswerAndOperator(s"SELECT from_json(_2, '$schema') FROM 
tbl")
+        checkSparkAnswerAndOperator(s"SELECT from_json(_2, '$schema').a FROM 
tbl")
+        checkSparkAnswerAndOperator(s"SELECT from_json(_2, '$schema').b FROM 
tbl")
+      }
+    }
+  }
+
+  test("from_json - null and error handling") {
+    Seq(true, false).foreach { dictionaryEnabled =>
+      withParquetTable(
+        Seq(
+          (1, """{"a":123,"b":"test"}"""), // Valid JSON
+          (2, """{"a":456}"""), // Missing field b
+          (3, """{"a":null}"""), // Explicit null
+          (4, """invalid json"""), // Parse error
+          (5, """{}"""), // Empty object
+          (6, """null"""), // JSON null
+          (7, null) // Null input
+        ),
+        "tbl",
+        withDictionary = dictionaryEnabled) {
+
+        val schema = "a INT, b STRING"
+        checkSparkAnswerAndOperator(
+          s"SELECT _1, from_json(_2, '$schema') as parsed FROM tbl ORDER BY 
_1")
+      }
+    }
+  }
+
+  test("from_json - all primitive types") {
+    Seq(true, false).foreach { dictionaryEnabled =>
+      withParquetTable(
+        (0 until 50).map(i => {
+          val sign = if (i % 2 == 0) 1 else -1
+          val json =
+            s"""{"i32":${sign * i},"i64":${sign * i * 
1000000000L},"f32":${sign * i * 1.5},"f64":${sign * i * 2.5},"bool":${i % 2 == 
0},"str":"value_$i"}"""
+          (i, json)
+        }),
+        "tbl",
+        withDictionary = dictionaryEnabled) {
+
+        val schema = "i32 INT, i64 BIGINT, f32 FLOAT, f64 DOUBLE, bool 
BOOLEAN, str STRING"
+        checkSparkAnswerAndOperator(s"SELECT from_json(_2, '$schema') FROM 
tbl")
+        checkSparkAnswerAndOperator(s"SELECT from_json(_2, '$schema').i32 FROM 
tbl")
+        checkSparkAnswerAndOperator(s"SELECT from_json(_2, '$schema').str FROM 
tbl")
+      }
+    }
+  }
+
+  test("from_json - null input produces null struct") {
+    Seq(true, false).foreach { dictionaryEnabled =>
+      withParquetTable(
+        Seq(
+          (1, """{"a":1,"b":"x"}"""), // Valid JSON to establish column type
+          (2, null) // Null input
+        ),
+        "tbl",
+        withDictionary = dictionaryEnabled) {
+
+        val schema = "a INT, b STRING"
+        // Verify that null input produces a NULL struct (not a struct with 
null fields)
+        checkSparkAnswerAndOperator(
+          s"SELECT _1, from_json(_2, '$schema') IS NULL as struct_is_null FROM 
tbl WHERE _1 = 2")
+        // Field access on null struct should return null
+        checkSparkAnswerAndOperator(
+          s"SELECT _1, from_json(_2, '$schema').a FROM tbl WHERE _1 = 2")
+      }
+    }
+  }
+
+  test("from_json - nested struct") {
+    Seq(true, false).foreach { dictionaryEnabled =>
+      withParquetTable(
+        (0 until 50).map(i => {
+          val json = 
s"""{"outer":{"inner_a":$i,"inner_b":"nested_$i"},"top_level":${i * 10}}"""
+          (i, json)
+        }),
+        "tbl",
+        withDictionary = dictionaryEnabled) {
+
+        val schema = "outer STRUCT<inner_a: INT, inner_b: STRING>, top_level 
INT"
+        checkSparkAnswerAndOperator(s"SELECT from_json(_2, '$schema') FROM 
tbl")
+        checkSparkAnswerAndOperator(s"SELECT from_json(_2, '$schema').outer 
FROM tbl")
+        checkSparkAnswerAndOperator(s"SELECT from_json(_2, 
'$schema').outer.inner_a FROM tbl")
+        checkSparkAnswerAndOperator(s"SELECT from_json(_2, 
'$schema').top_level FROM tbl")
+      }
+    }
+  }
+
+  test("from_json - valid json with incompatible schema") {
+    Seq(true, false).foreach { dictionaryEnabled =>
+      withParquetTable(
+        Seq(
+          (1, """{"a":"not_a_number","b":"test"}"""), // String where INT 
expected
+          (2, """{"a":123,"b":456}"""), // Number where STRING expected
+          (3, """{"a":{"nested":"value"},"b":"test"}"""), // Object where INT 
expected
+          (4, """{"a":[1,2,3],"b":"test"}"""), // Array where INT expected
+          (5, """{"a":123.456,"b":"test"}"""), // Float where INT expected
+          (6, """{"a":true,"b":"test"}"""), // Boolean where INT expected
+          (7, """{"a":123,"b":null}""") // Null value for STRING field
+        ),
+        "tbl",
+        withDictionary = dictionaryEnabled) {
+
+        val schema = "a INT, b STRING"
+        // When types don't match, Spark typically returns null for that field
+        checkSparkAnswerAndOperator(
+          s"SELECT _1, from_json(_2, '$schema') as parsed FROM tbl ORDER BY 
_1")
+        checkSparkAnswerAndOperator(s"SELECT _1, from_json(_2, '$schema').a 
FROM tbl ORDER BY _1")
+        checkSparkAnswerAndOperator(s"SELECT _1, from_json(_2, '$schema').b 
FROM tbl ORDER BY _1")
+      }
+    }
+  }
+}
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
new file mode 100644
index 000000000..e8bd00bd9
--- /dev/null
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.benchmark
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.catalyst.expressions.JsonToStructs
+
+import org.apache.comet.CometConf
+
+/**
+ * Configuration for a JSON expression benchmark.
+ * @param name
+ *   Name for the benchmark
+ * @param schema
+ *   Target schema for from_json
+ * @param query
+ *   SQL query to benchmark
+ * @param extraCometConfigs
+ *   Additional Comet configurations for the scan+exec case
+ */
+case class JsonExprConfig(
+    name: String,
+    schema: String,
+    query: String,
+    extraCometConfigs: Map[String, String] = Map.empty)
+
+// spotless:off
+/**
+ * Benchmark to measure performance of Comet JSON expressions. To run this 
benchmark:
+ * `SPARK_GENERATE_BENCHMARK_FILES=1 make 
benchmark-org.apache.spark.sql.benchmark.CometJsonExpressionBenchmark`
+ * Results will be written to 
"spark/benchmarks/CometJsonExpressionBenchmark-**results.txt".
+ */
+// spotless:on
+object CometJsonExpressionBenchmark extends CometBenchmarkBase {
+
+  /**
+   * Generic method to run a JSON expression benchmark with the given 
configuration.
+   */
+  def runJsonExprBenchmark(config: JsonExprConfig, values: Int): Unit = {
+    val benchmark = new Benchmark(config.name, values, output = output)
+
+    withTempPath { dir =>
+      withTempTable("parquetV1Table") {
+        // Generate data with specified JSON patterns
+        val jsonData = config.name match {
+          case "from_json - simple primitives" =>
+            spark.sql(s"""
+              SELECT
+                concat('{"a":', CAST(value AS STRING), ',"b":"str_', 
CAST(value AS STRING), '"}') AS json_str
+              FROM $tbl
+            """)
+
+          case "from_json - all primitive types" =>
+            spark.sql(s"""
+              SELECT
+                concat(
+                  '{"i32":', CAST(value % 1000 AS STRING),
+                  ',"i64":', CAST(value * 1000000000L AS STRING),
+                  ',"f32":', CAST(value * 1.5 AS STRING),
+                  ',"f64":', CAST(value * 2.5 AS STRING),
+                  ',"bool":', CASE WHEN value % 2 = 0 THEN 'true' ELSE 'false' 
END,
+                  ',"str":"value_', CAST(value AS STRING), '"}'
+                ) AS json_str
+              FROM $tbl
+            """)
+
+          case "from_json - with nulls" =>
+            spark.sql(s"""
+              SELECT
+                CASE
+                  WHEN value % 10 = 0 THEN NULL
+                  WHEN value % 5 = 0 THEN '{"a":null,"b":"test"}'
+                  WHEN value % 3 = 0 THEN '{"a":123}'
+                  ELSE concat('{"a":', CAST(value AS STRING), ',"b":"str_', 
CAST(value AS STRING), '"}')
+                END AS json_str
+              FROM $tbl
+            """)
+
+          case "from_json - nested struct" =>
+            spark.sql(s"""
+              SELECT
+                concat(
+                  '{"outer":{"inner_a":', CAST(value AS STRING),
+                  ',"inner_b":"nested_', CAST(value AS STRING), '"}}') AS 
json_str
+              FROM $tbl
+            """)
+
+          case "from_json - field access" =>
+            spark.sql(s"""
+              SELECT
+                concat('{"a":', CAST(value AS STRING), ',"b":"str_', 
CAST(value AS STRING), '"}') AS json_str
+              FROM $tbl
+            """)
+
+          case _ =>
+            spark.sql(s"""
+              SELECT
+                concat('{"a":', CAST(value AS STRING), ',"b":"str_', 
CAST(value AS STRING), '"}') AS json_str
+              FROM $tbl
+            """)
+        }
+
+        prepareTable(dir, jsonData)
+
+        benchmark.addCase("SQL Parquet - Spark") { _ =>
+          spark.sql(config.query).noop()
+        }
+
+        benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
+          withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
+            spark.sql(config.query).noop()
+          }
+        }
+
+        benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
+          val baseConfigs =
+            Map(
+              CometConf.COMET_ENABLED.key -> "true",
+              CometConf.COMET_EXEC_ENABLED.key -> "true",
+              CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) 
-> "true",
+              "spark.sql.optimizer.constantFolding.enabled" -> "false")
+          val allConfigs = baseConfigs ++ config.extraCometConfigs
+
+          withSQLConf(allConfigs.toSeq: _*) {
+            spark.sql(config.query).noop()
+          }
+        }
+
+        benchmark.run()
+      }
+    }
+  }
+
+  // Configuration for all JSON expression benchmarks
+  private val jsonExpressions = List(
+    JsonExprConfig(
+      "from_json - simple primitives",
+      "a INT, b STRING",
+      "SELECT from_json(json_str, 'a INT, b STRING') FROM parquetV1Table"),
+    JsonExprConfig(
+      "from_json - all primitive types",
+      "i32 INT, i64 BIGINT, f32 FLOAT, f64 DOUBLE, bool BOOLEAN, str STRING",
+      "SELECT from_json(json_str, 'i32 INT, i64 BIGINT, f32 FLOAT, f64 DOUBLE, 
bool BOOLEAN, str STRING') FROM parquetV1Table"),
+    JsonExprConfig(
+      "from_json - with nulls",
+      "a INT, b STRING",
+      "SELECT from_json(json_str, 'a INT, b STRING') FROM parquetV1Table"),
+    JsonExprConfig(
+      "from_json - nested struct",
+      "outer STRUCT<inner_a: INT, inner_b: STRING>",
+      "SELECT from_json(json_str, 'outer STRUCT<inner_a: INT, inner_b: 
STRING>') FROM parquetV1Table"),
+    JsonExprConfig(
+      "from_json - field access",
+      "a INT, b STRING",
+      "SELECT from_json(json_str, 'a INT, b STRING').a FROM parquetV1Table"))
+
+  override def runCometBenchmark(mainArgs: Array[String]): Unit = {
+    val values = 1024 * 1024
+
+    jsonExpressions.foreach { config =>
+      runBenchmarkWithTable(config.name, values) { v =>
+        runJsonExprBenchmark(config, v)
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to