alamb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2192262877


##########
datafusion-examples/examples/variant_shredding.rs:
##########
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{
+    Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
+};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{
+    ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, 
Signature,
+    TableProviderFilterPushDown, TableType, Volatility,
+};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::schema_rewriter::PhysicalExprSchemaRewriteHook;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use datafusion::scalar::ScalarValue;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for variant 
shredding.

Review Comment:
   Since this example doesn't actually use variants, it uses JSON, what would 
you think about calling this example `json_shredding` instead?
   
   I can also help out with adding some more background here and diagrams -- I 
think the important things to point out are:
   1. This is a technique for speeding up queries on "semi-structured" data 
such as JSON
   2. It works by materializing ("shredding") one or more columns separately 
from the JSOn to take advantage of columnar storage
   3. However, this requires the query engine to rewrite predicates against the 
semi-structured column into predicates on shredded columns when available



##########
datafusion-examples/examples/variant_shredding.rs:
##########
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{
+    Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
+};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{
+    ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, 
Signature,
+    TableProviderFilterPushDown, TableType, Volatility,
+};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::schema_rewriter::PhysicalExprSchemaRewriteHook;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use datafusion::scalar::ScalarValue;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for variant 
shredding.
+//
+// In this example, we have a table with flat columns using underscore 
prefixes:
+// data: "...", _data.name: "..."
+//
+// Our custom TableProvider will use a FilterExpressionRewriter to rewrite
+// expressions like `json_get_str('name', data)` to use a flattened column name
+// `_data.name` if it exists in the file schema.
+#[tokio::main]
+async fn main() -> Result<()> {
+    println!("=== Creating example data with flat columns and underscore 
prefixes ===");
+
+    // Create sample data with flat columns using underscore prefixes
+    let (table_schema, batch) = create_sample_data();
+
+    let store = InMemory::new();

Review Comment:
   sylistically, I think this example would read better if the data setup was 
in its own function -- so readers could see "setup the data" as a single line 
and not worry about the details unless they needed to



##########
datafusion-examples/examples/variant_shredding.rs:
##########
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{
+    Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
+};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{
+    ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, 
Signature,
+    TableProviderFilterPushDown, TableType, Volatility,
+};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::schema_rewriter::PhysicalExprSchemaRewriteHook;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use datafusion::scalar::ScalarValue;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for variant 
shredding.
+//
+// In this example, we have a table with flat columns using underscore 
prefixes:
+// data: "...", _data.name: "..."
+//
+// Our custom TableProvider will use a FilterExpressionRewriter to rewrite
+// expressions like `json_get_str('name', data)` to use a flattened column name
+// `_data.name` if it exists in the file schema.
+#[tokio::main]
+async fn main() -> Result<()> {
+    println!("=== Creating example data with flat columns and underscore 
prefixes ===");
+
+    // Create sample data with flat columns using underscore prefixes
+    let (table_schema, batch) = create_sample_data();
+
+    let store = InMemory::new();
+    let buf = {
+        let mut buf = vec![];
+
+        let props = WriterProperties::builder()
+            .set_max_row_group_size(2)
+            .build();
+
+        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), 
Some(props))
+            .expect("creating writer");
+
+        writer.write(&batch).expect("Writing batch");
+        writer.close().unwrap();
+        buf
+    };
+    let path = Path::from("example.parquet");
+    let payload = PutPayload::from_bytes(buf.into());
+    store.put(&path, payload).await?;
+
+    // Create a custom table provider that rewrites struct field access
+    let table_provider = Arc::new(ExampleTableProvider::new(table_schema));
+
+    // Set up query execution
+    let mut cfg = SessionConfig::new();
+    cfg.options_mut().execution.parquet.pushdown_filters = true;
+    let ctx = SessionContext::new_with_config(cfg);
+
+    // Register our table
+    ctx.register_table("structs", table_provider)?;
+    ctx.register_udf(ScalarUDF::new_from_impl(JsonGetStr::default()));
+
+    ctx.runtime_env().register_object_store(
+        ObjectStoreUrl::parse("memory://")?.as_ref(),
+        Arc::new(store),
+    );
+
+    println!("\n=== Showing all data ===");
+    let batches = ctx.sql("SELECT * FROM structs").await?.collect().await?;
+    arrow::util::pretty::print_batches(&batches)?;
+
+    println!("\n=== Running query with flat column access and filter ===");
+    let query = "SELECT json_get_str('age', data) as age FROM structs WHERE 
json_get_str('name', data) = 'Bob'";
+    println!("Query: {query}");
+
+    let batches = ctx.sql(query).await?.collect().await?;
+
+    #[rustfmt::skip]
+    let expected = [
+        "+-----+",
+        "| age |",
+        "+-----+",
+        "| 25  |",
+        "+-----+",
+    ];
+    arrow::util::pretty::print_batches(&batches)?;
+    assert_batches_eq!(expected, &batches);
+
+    println!("\n=== Running explain analyze to confirm row group pruning ===");
+
+    let batches = ctx
+        .sql(&format!("EXPLAIN ANALYZE {query}"))
+        .await?
+        .collect()
+        .await?;
+    let plan = format!("{}", 
arrow::util::pretty::pretty_format_batches(&batches)?);
+    println!("{plan}");
+    assert_contains!(&plan, "row_groups_pruned_statistics=1");
+    assert_contains!(&plan, "pushdown_rows_pruned=1");
+
+    Ok(())
+}
+
+/// Create the example data with flat columns using underscore prefixes.
+/// The table schema has `data` column, while the file schema has both `data` 
and `_data.name` as flat columns.

Review Comment:
   It might help here to provide a comment with what the data logically looks 
like too (as it may not be clear what this is supposed to be modeling)
   
   ```json
   { "age": 30, "name": "Alice"}
   ```
   
   ```json
   { "age": 25, "name": "Bob" }
   ```
   ...
   
   



##########
datafusion/physical-expr/src/schema_rewriter.rs:
##########
@@ -29,6 +29,16 @@ use datafusion_common::{
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 
 use crate::expressions::{self, CastExpr, Column};
+pub trait PhysicalExprSchemaRewriteHook: Send + Sync + std::fmt::Debug {

Review Comment:
   The "hook" terminology seemed somewhat different than the rest of the repo 
to me
   
   What would you think about calling this `PhysicalSchemaExprRewriter` (and 
adjusting the various registration methods and field names)?
   
   



##########
datafusion/physical-expr/src/schema_rewriter.rs:
##########
@@ -118,10 +129,22 @@ impl<'a> PhysicalExprSchemaRewriter<'a> {
                 if let Some(partition_value) = 
self.get_partition_value(column.name()) {
                     return 
Ok(Transformed::yes(expressions::lit(partition_value)));
                 }
-                // If the column is not found in the logical schema and is not 
a partition value, return an error
-                // This should probably never be hit unless something upstream 
broke, but nontheless it's better
-                // for us to return a handleable error than to panic / do 
something unexpected.
-                return Err(e.into());
+                // This can be hit if a custom rewrite injected a reference to 
a column that doesn't exist in the logical schema.
+                // For example, a pre-computed column that is kept only in the 
physical schema.
+                // If the column exists in the physical schema, we can still 
use it.
+                if let Ok(physical_field) =
+                    self.physical_file_schema.field_with_name(column.name())
+                {
+                    // If the column exists in the physical schema, we can use 
it in place of the logical column.
+                    // This is nice to users because if they do a rewrite that 
results in something like `phyiscal_int32_col = 123u64`
+                    // we'll at least handle the casts for them.
+                    physical_field
+                } else {
+                    // A completely unknwon column that doesn't exist in 
either schema!
+                    // This should probably never be hit unless something 
upstream broke, but nontheless it's better
+                    // for us to return a handleable error than to panic / do 
something unexpected.

Review Comment:
   agree



##########
datafusion-examples/examples/variant_shredding.rs:
##########
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{
+    Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
+};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{
+    ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, 
Signature,
+    TableProviderFilterPushDown, TableType, Volatility,
+};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::schema_rewriter::PhysicalExprSchemaRewriteHook;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use datafusion::scalar::ScalarValue;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for variant 
shredding.
+//
+// In this example, we have a table with flat columns using underscore 
prefixes:
+// data: "...", _data.name: "..."
+//
+// Our custom TableProvider will use a FilterExpressionRewriter to rewrite
+// expressions like `json_get_str('name', data)` to use a flattened column name
+// `_data.name` if it exists in the file schema.
+#[tokio::main]
+async fn main() -> Result<()> {
+    println!("=== Creating example data with flat columns and underscore 
prefixes ===");
+
+    // Create sample data with flat columns using underscore prefixes
+    let (table_schema, batch) = create_sample_data();
+
+    let store = InMemory::new();
+    let buf = {
+        let mut buf = vec![];
+
+        let props = WriterProperties::builder()
+            .set_max_row_group_size(2)
+            .build();
+
+        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), 
Some(props))
+            .expect("creating writer");
+
+        writer.write(&batch).expect("Writing batch");
+        writer.close().unwrap();
+        buf
+    };
+    let path = Path::from("example.parquet");
+    let payload = PutPayload::from_bytes(buf.into());
+    store.put(&path, payload).await?;
+
+    // Create a custom table provider that rewrites struct field access
+    let table_provider = Arc::new(ExampleTableProvider::new(table_schema));
+
+    // Set up query execution
+    let mut cfg = SessionConfig::new();
+    cfg.options_mut().execution.parquet.pushdown_filters = true;
+    let ctx = SessionContext::new_with_config(cfg);
+
+    // Register our table
+    ctx.register_table("structs", table_provider)?;
+    ctx.register_udf(ScalarUDF::new_from_impl(JsonGetStr::default()));
+
+    ctx.runtime_env().register_object_store(
+        ObjectStoreUrl::parse("memory://")?.as_ref(),
+        Arc::new(store),
+    );
+
+    println!("\n=== Showing all data ===");
+    let batches = ctx.sql("SELECT * FROM structs").await?.collect().await?;
+    arrow::util::pretty::print_batches(&batches)?;
+
+    println!("\n=== Running query with flat column access and filter ===");
+    let query = "SELECT json_get_str('age', data) as age FROM structs WHERE 
json_get_str('name', data) = 'Bob'";
+    println!("Query: {query}");
+
+    let batches = ctx.sql(query).await?.collect().await?;
+
+    #[rustfmt::skip]
+    let expected = [
+        "+-----+",
+        "| age |",
+        "+-----+",
+        "| 25  |",
+        "+-----+",
+    ];
+    arrow::util::pretty::print_batches(&batches)?;
+    assert_batches_eq!(expected, &batches);
+
+    println!("\n=== Running explain analyze to confirm row group pruning ===");
+
+    let batches = ctx
+        .sql(&format!("EXPLAIN ANALYZE {query}"))
+        .await?
+        .collect()
+        .await?;
+    let plan = format!("{}", 
arrow::util::pretty::pretty_format_batches(&batches)?);
+    println!("{plan}");
+    assert_contains!(&plan, "row_groups_pruned_statistics=1");
+    assert_contains!(&plan, "pushdown_rows_pruned=1");
+
+    Ok(())
+}
+
+/// Create the example data with flat columns using underscore prefixes.
+/// The table schema has `data` column, while the file schema has both `data` 
and `_data.name` as flat columns.
+fn create_sample_data() -> (SchemaRef, RecordBatch) {
+    // The table schema only has the main data column
+    let table_schema = Schema::new(vec![Field::new("data", DataType::Utf8, 
false)]);
+
+    // The file schema has both the main column and the shredded flat column 
with underscore prefix
+    let file_schema = Schema::new(vec![
+        Field::new("data", DataType::Utf8, false),
+        Field::new("_data.name", DataType::Utf8, false),
+    ]);
+
+    // Build a RecordBatch with flat columns
+    let data_array = StringArray::from(vec![
+        r#"{"age": 30}"#,
+        r#"{"age": 25}"#,
+        r#"{"age": 35}"#,
+        r#"{"age": 22}"#,
+    ]);
+    let names_array = StringArray::from(vec!["Alice", "Bob", "Charlie", 
"Dave"]);
+
+    (
+        Arc::new(table_schema),
+        RecordBatch::try_new(
+            Arc::new(file_schema),
+            vec![Arc::new(data_array), Arc::new(names_array)],
+        )
+        .unwrap(),
+    )
+}
+
+/// Custom TableProvider that uses a StructFieldRewriter
+#[derive(Debug)]
+struct ExampleTableProvider {
+    schema: SchemaRef,
+}
+
+impl ExampleTableProvider {
+    fn new(schema: SchemaRef) -> Self {
+        Self { schema }
+    }
+}
+
+#[async_trait]
+impl TableProvider for ExampleTableProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    fn supports_filters_pushdown(
+        &self,
+        filters: &[&Expr],
+    ) -> Result<Vec<TableProviderFilterPushDown>> {
+        // Implementers can choose to mark these filters as exact or inexact.
+        // If marked as exact they cannot have false positives and must always 
be applied.
+        // If marked as Inexact they can have false positives and at runtime 
the rewriter
+        // can decide to not rewrite / ignore some filters since they will be 
re-evaluated upstream.
+        // For the purposes of this example we mark them as Exact to 
demonstrate the rewriter is working and the filtering is not being re-evaluated 
upstream.
+        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
+    }
+
+    async fn scan(
+        &self,
+        state: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        filters: &[Expr],
+        limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let schema = self.schema.clone();
+        let df_schema = DFSchema::try_from(schema.clone())?;
+        let filter = state.create_physical_expr(
+            conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true)),
+            &df_schema,
+        )?;
+
+        let parquet_source = ParquetSource::default()
+            .with_predicate(filter)
+            .with_pushdown_filters(true)
+            // if the rewriter needs a reference to the table schema you can 
bind self.schema() here
+            .with_predicate_rewrite_hook(Arc::new(ShreddedVariantRewriter) as 
_);
+
+        let object_store_url = ObjectStoreUrl::parse("memory://")?;
+
+        let store = state.runtime_env().object_store(object_store_url)?;
+
+        let mut files = vec![];
+        let mut listing = store.list(None);
+        while let Some(file) = listing.next().await {
+            if let Ok(file) = file {
+                files.push(file);
+            }
+        }
+
+        let file_group = files
+            .iter()
+            .map(|file| PartitionedFile::new(file.location.clone(), file.size))
+            .collect();
+
+        let file_scan_config = FileScanConfigBuilder::new(
+            ObjectStoreUrl::parse("memory://")?,
+            schema,
+            Arc::new(parquet_source),
+        )
+        .with_projection(projection.cloned())
+        .with_limit(limit)
+        .with_file_group(file_group);
+
+        Ok(Arc::new(DataSourceExec::new(Arc::new(
+            file_scan_config.build(),
+        ))))
+    }
+}
+
+/// Scalar UDF that uses serde_json to access json fields
+#[derive(Debug)]
+pub struct JsonGetStr {
+    signature: Signature,
+    aliases: [String; 1],
+}
+
+impl Default for JsonGetStr {
+    fn default() -> Self {
+        Self {
+            signature: Signature::variadic_any(Volatility::Immutable),
+            aliases: ["json_get_str".to_string()],
+        }
+    }
+}
+
+impl ScalarUDFImpl for JsonGetStr {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        self.aliases[0].as_str()
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        Ok(DataType::Utf8)
+    }
+
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        assert!(
+            args.args.len() == 2,
+            "json_get_str requires exactly 2 arguments"
+        );
+        let key = match &args.args[0] {
+            ColumnarValue::Scalar(ScalarValue::Utf8(Some(key))) => key,
+            _ => {
+                return Err(datafusion::error::DataFusionError::Execution(
+                    "json_get_str first argument must be a string".to_string(),
+                ))
+            }
+        };
+        // We expect a string array that contains JSON strings
+        let json_array = match &args.args[1] {
+            ColumnarValue::Array(array) => array
+                .as_any()
+                .downcast_ref::<StringArray>()
+                .ok_or_else(|| {
+                datafusion::error::DataFusionError::Execution(
+                    "json_get_str second argument must be a string 
array".to_string(),
+                )
+            })?,
+            _ => {
+                return Err(datafusion::error::DataFusionError::Execution(
+                    "json_get_str second argument must be a string 
array".to_string(),
+                ))
+            }
+        };
+        let values = json_array
+            .iter()
+            .map(|value| {
+                value.and_then(|v| {
+                    let json_value: serde_json::Value =
+                        serde_json::from_str(v).unwrap_or_default();
+                    json_value.get(key).map(|v| v.to_string())
+                })
+            })
+            .collect::<StringArray>();
+        Ok(ColumnarValue::Array(Arc::new(values)))
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+}
+
+/// Rewriter that converts json_get_str calls to direct flat column references

Review Comment:
   Given this example is all about JSON I suggest removing the term Variant for 
the time being
   
   So maybe a better name for this would be `ShreddedJsonRewriter` or 
`JsonGetStrRewriter`



##########
datafusion/physical-expr/src/schema_rewriter.rs:
##########
@@ -29,6 +29,16 @@ use datafusion_common::{
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 
 use crate::expressions::{self, CastExpr, Column};
+pub trait PhysicalExprSchemaRewriteHook: Send + Sync + std::fmt::Debug {

Review Comment:
   BTW should the struct below here that is currently called 
`PhysicalExprSchemaRewriter` implement `PhysicalExprSchemaRewriteHook`? 
   
   It seems like it overlaps somehow with PhysicalExprSchemaRewriteHook 🤔 
   



##########
datafusion-examples/examples/variant_shredding.rs:
##########
@@ -0,0 +1,398 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TreeNodeRecursion};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{
+    ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, 
Signature,
+    TableProviderFilterPushDown, TableType, Volatility,
+};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::schema_rewriter::PhysicalExprSchemaRewriteHook;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use datafusion::scalar::ScalarValue;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for variant 
shredding.
+//
+// In this example, we have a table with flat columns using underscore 
prefixes:
+// data: "...", _data.name: "..."

Review Comment:
   Yeah I am not exactly sure what happens if you select just leaf columns from 
a parquet struct with the projection mask
   
   Also for this example I think it is nicer to show how this works without 
*also* the complication of struct columns -- the struct thing is clever but 
really part of the particular parquet implementation of variant rather than 
required for shredding in general



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to