adriangb commented on code in PR #19437:
URL: https://github.com/apache/datafusion/pull/19437#discussion_r2644641468


##########
datafusion-examples/examples/custom_data_source/adapter_serialization.rs:
##########
@@ -0,0 +1,527 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! See `main.rs` for how to run it.
+//!
+//! This example demonstrates how to use the `PhysicalExtensionCodec` trait's
+//! interception methods (`serialize_physical_plan` and 
`deserialize_physical_plan`)
+//! to implement custom serialization logic.
+//!
+//! The key insight is that `FileScanConfig::expr_adapter_factory` is NOT 
serialized by
+//! default. This example shows how to:
+//! 1. Detect plans with custom adapters during serialization
+//! 2. Wrap them as Extension nodes with JSON-serialized adapter metadata
+//! 3. Unwrap and restore the adapter during deserialization
+//!
+//! This demonstrates nested serialization (protobuf outer, JSON inner) and 
the power
+//! of the `PhysicalExtensionCodec` interception pattern. Both plan and 
expression
+//! serialization route through the codec, enabling interception at every node 
in the tree.
+
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use arrow::array::record_batch;
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::assert_batches_eq;
+use datafusion::common::Result;
+use datafusion::common::not_impl_err;
+use datafusion::datasource::listing::{
+    ListingTable, ListingTableConfig, ListingTableConfigExt, ListingTableUrl,
+};
+use datafusion::datasource::physical_plan::{FileScanConfig, 
FileScanConfigBuilder};
+use datafusion::datasource::source::DataSourceExec;
+use datafusion::execution::TaskContext;
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionConfig;
+use datafusion_physical_expr_adapter::{
+    DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, 
PhysicalExprAdapterFactory,
+};
+use datafusion_proto::bytes::{
+    physical_plan_from_bytes_with_extension_codec,
+    physical_plan_to_bytes_with_extension_codec,
+};
+use datafusion_proto::physical_plan::{
+    AsExecutionPlan, PhysicalExtensionCodec, PhysicalExtensionProtoCodec,
+};
+use datafusion_proto::protobuf::{
+    PhysicalExprNode, PhysicalExtensionNode, PhysicalPlanNode,
+    physical_plan_node::PhysicalPlanType,
+};
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+use prost::Message;
+use serde::{Deserialize, Serialize};
+
+/// Example showing how to preserve custom adapter information during plan 
serialization.
+///
+/// This demonstrates:
+/// 1. Creating a custom PhysicalExprAdapter with metadata
+/// 2. Using PhysicalExtensionCodec to intercept serialization
+/// 3. Wrapping adapter info as Extension nodes
+/// 4. Restoring adapters during deserialization
+pub async fn adapter_serialization() -> Result<()> {
+    println!("=== PhysicalExprAdapter Serialization Example ===\n");
+
+    // Step 1: Create sample Parquet data in memory
+    println!("Step 1: Creating sample Parquet data...");
+    let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+    let batch = record_batch!(("id", Int32, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))?;
+    let path = Path::from("data.parquet");
+    write_parquet(&store, &path, &batch).await?;
+
+    // Step 2: Set up session with custom adapter
+    println!("Step 2: Setting up session with custom adapter...");
+    let logical_schema =
+        Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
+
+    let mut cfg = SessionConfig::new();
+    cfg.options_mut().execution.parquet.pushdown_filters = true;
+    let ctx = SessionContext::new_with_config(cfg);
+    ctx.runtime_env().register_object_store(
+        ObjectStoreUrl::parse("memory://")?.as_ref(),
+        Arc::clone(&store),
+    );
+
+    // Create a table with our custom MetadataAdapterFactory
+    let adapter_factory = Arc::new(MetadataAdapterFactory::new("v1"));
+    let listing_config =
+        
ListingTableConfig::new(ListingTableUrl::parse("memory:///data.parquet")?)
+            .infer_options(&ctx.state())
+            .await?
+            .with_schema(logical_schema)
+            .with_expr_adapter_factory(
+                Arc::clone(&adapter_factory) as Arc<dyn 
PhysicalExprAdapterFactory>
+            );
+    let table = ListingTable::try_new(listing_config)?;
+    ctx.register_table("my_table", Arc::new(table))?;
+
+    // Step 3: Create physical plan with filter
+    println!("Step 3: Creating physical plan with filter...");
+    let df = ctx.sql("SELECT * FROM my_table WHERE id > 5").await?;
+    let original_plan = df.create_physical_plan().await?;
+
+    // Verify adapter is present in original plan
+    let has_adapter_before = verify_adapter_in_plan(&original_plan, 
"original");
+    println!("  Original plan has adapter: {has_adapter_before}");
+
+    // Step 4: Serialize with our custom codec
+    println!("\nStep 4: Serializing plan with AdapterPreservingCodec...");
+    let codec = AdapterPreservingCodec;
+    let bytes = physical_plan_to_bytes_with_extension_codec(
+        Arc::clone(&original_plan),
+        &codec,
+        &codec,
+    )?;
+    println!("  Serialized {} bytes", bytes.len());
+    println!("  (DataSourceExec with adapter was wrapped as 
PhysicalExtensionNode)");
+
+    // Step 5: Deserialize with our custom codec
+    println!("\nStep 5: Deserializing plan with AdapterPreservingCodec...");
+    let task_ctx = ctx.task_ctx();
+    let restored_plan =
+        physical_plan_from_bytes_with_extension_codec(&bytes, &task_ctx, 
&codec, &codec)?;
+
+    // Verify adapter is restored
+    let has_adapter_after = verify_adapter_in_plan(&restored_plan, "restored");
+    println!("  Restored plan has adapter: {has_adapter_after}");
+
+    // Step 6: Execute and compare results
+    println!("\nStep 6: Executing plans and comparing results...");
+    let original_results =
+        datafusion::physical_plan::collect(Arc::clone(&original_plan), 
task_ctx.clone())
+            .await?;
+    let restored_results =
+        datafusion::physical_plan::collect(restored_plan, task_ctx).await?;
+
+    #[rustfmt::skip]
+    let expected = [
+        "+----+",
+        "| id |",
+        "+----+",
+        "| 6  |",
+        "| 7  |",
+        "| 8  |",
+        "| 9  |",
+        "| 10 |",
+        "+----+",
+    ];
+
+    println!("\n  Original plan results:");
+    arrow::util::pretty::print_batches(&original_results)?;
+    assert_batches_eq!(expected, &original_results);
+
+    println!("\n  Restored plan results:");
+    arrow::util::pretty::print_batches(&restored_results)?;
+    assert_batches_eq!(expected, &restored_results);
+
+    println!("\n=== Example Complete! ===");
+    println!("Key takeaways:");
+    println!(
+        "  1. PhysicalExtensionCodec provides 
serialize_physical_plan/deserialize_physical_plan hooks"
+    );
+    println!("  2. Custom metadata can be wrapped as PhysicalExtensionNode");
+    println!("  3. Nested serialization (protobuf + JSON) works seamlessly");
+    println!(
+        "  4. Both plans produce identical results despite serialization 
round-trip"
+    );
+    println!("  5. Adapters are fully preserved through the serialization 
round-trip");
+
+    Ok(())
+}
+
+// ============================================================================
+// MetadataAdapter - A simple custom adapter with a tag
+// ============================================================================
+
+/// A custom PhysicalExprAdapter that wraps another adapter.
+/// The tag metadata is stored in the factory, not the adapter itself.
+#[derive(Debug)]
+struct MetadataAdapter {
+    inner: Arc<dyn PhysicalExprAdapter>,
+}
+
+impl PhysicalExprAdapter for MetadataAdapter {
+    fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn 
PhysicalExpr>> {
+        // Simply delegate to inner adapter
+        self.inner.rewrite(expr)
+    }
+}
+
+// ============================================================================
+// MetadataAdapterFactory - Factory for creating MetadataAdapter instances
+// ============================================================================
+
+/// Factory for creating MetadataAdapter instances.
+/// The tag is stored in the factory and extracted via Debug formatting in 
`extract_adapter_tag`.
+#[derive(Debug)]
+struct MetadataAdapterFactory {
+    // Note: This field is read via Debug formatting in `extract_adapter_tag`.
+    // Rust's dead code analysis doesn't recognize Debug-based field access.
+    // In PR #19234, this field is used by `with_partition_values`, but that 
method
+    // doesn't exist in upstream DataFusion's PhysicalExprAdapter trait.
+    #[allow(dead_code)]
+    tag: String,
+}
+
+impl MetadataAdapterFactory {
+    fn new(tag: impl Into<String>) -> Self {
+        Self { tag: tag.into() }
+    }
+}
+
+impl PhysicalExprAdapterFactory for MetadataAdapterFactory {
+    fn create(
+        &self,
+        logical_file_schema: SchemaRef,
+        physical_file_schema: SchemaRef,
+    ) -> Arc<dyn PhysicalExprAdapter> {
+        let inner = DefaultPhysicalExprAdapterFactory
+            .create(logical_file_schema, physical_file_schema);
+        Arc::new(MetadataAdapter { inner })
+    }
+}
+
+// ============================================================================
+// AdapterPreservingCodec - Custom codec that preserves adapters
+// ============================================================================
+
+/// Extension payload structure for serializing adapter info
+#[derive(Serialize, Deserialize)]
+struct ExtensionPayload {
+    /// Marker to identify this is our custom extension
+    marker: String,
+    /// JSON-serialized adapter metadata
+    adapter_metadata: AdapterMetadata,
+    /// Protobuf-serialized inner DataSourceExec (without adapter)
+    inner_plan_bytes: Vec<u8>,
+}
+
+/// Metadata about the adapter to recreate it during deserialization
+#[derive(Serialize, Deserialize)]
+struct AdapterMetadata {
+    /// The adapter tag (e.g., "v1")
+    tag: String,
+}
+
+const EXTENSION_MARKER: &str = "adapter_preserving_extension_v1";
+
+/// A codec that intercepts serialization to preserve adapter information.
+#[derive(Debug)]
+struct AdapterPreservingCodec;
+
+impl PhysicalExtensionCodec for AdapterPreservingCodec {
+    // Required method: decode custom extension nodes
+    fn try_decode(
+        &self,
+        buf: &[u8],
+        _inputs: &[Arc<dyn ExecutionPlan>],

Review Comment:
   Curious if we could use `inputs` here for instead of making that go out of 
band and hence "breaking" the serialization tree? I.e. have a custom node but 
have everything below it be "standard".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to