timsaucer commented on code in PR #1541: URL: https://github.com/apache/datafusion-python/pull/1541#discussion_r3243094087
########## crates/core/src/codec.rs: ########## @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Python-aware extension codecs. +//! +//! Datafusion-python plans can carry references to Python-defined +//! objects that the upstream protobuf codecs do not know how to +//! serialize: pure-Python scalar / aggregate / window UDFs, Python +//! query-planning extensions, and so on. Their state lives inside +//! `Py<PyAny>` callables and closures rather than being recoverable +//! from a name in the receiver's function registry. To ship a plan +//! across a process boundary (pickle, `multiprocessing`, Ray actor, +//! `datafusion-distributed`, etc.) those payloads have to be encoded +//! into the proto wire format itself. +//! +//! [`PythonLogicalCodec`] is the [`LogicalExtensionCodec`] that +//! datafusion-python parks on every `SessionContext`. It wraps a +//! user-supplied (or default) inner codec and adds Python-aware +//! in-band encoding on top: when the encoder sees a Python-defined +//! UDF, the codec cloudpickles the callable + signature into the +//! `fun_definition` proto field; when the decoder sees a payload it +//! produced, it reconstructs the UDF from the bytes alone — no +//! pre-registration on the receiver. UDFs the codec does not +//! recognise are delegated to `inner`, which is typically +//! `DefaultLogicalExtensionCodec` but may be a downstream-supplied +//! FFI codec installed via +//! `SessionContext.with_logical_extension_codec(...)`. +//! +//! [`PythonPhysicalCodec`] is the symmetric wrapper around +//! [`PhysicalExtensionCodec`]. Logical and physical layers each have +//! a `try_encode_udf` / `try_decode_udf` pair, so a `ScalarUDF` +//! referenced inside a `LogicalPlan`, an `ExecutionPlan`, or a +//! `PhysicalExpr` must encode identically through either layer for +//! plans to survive a serialization round-trip. Both codecs share +//! the same payload framing for that reason. +//! +//! Payloads emitted by these codecs are tagged with an 8-byte magic +//! prefix so the decoder can distinguish them from arbitrary bytes +//! (empty `fun_definition` from the default codec, user FFI payloads +//! that picked a non-colliding prefix). Dispatch precedence on +//! decode: **Python-inline payload (magic prefix match) → `inner` +//! codec → caller's `FunctionRegistry` fallback.** +//! +//! ## Wire-format magic prefix registry +//! +//! | Layer + kind | Magic prefix | +//! | ----------------------------- | ------------ | +//! | `PythonLogicalCodec` scalar | `DFPYUDF1` | +//! | `PythonLogicalCodec` agg | `DFPYUDA1` | +//! | `PythonLogicalCodec` window | `DFPYUDW1` | +//! | `PythonPhysicalCodec` scalar | `DFPYUDF1` | +//! | `PythonPhysicalCodec` agg | `DFPYUDA1` | +//! | `PythonPhysicalCodec` window | `DFPYUDW1` | +//! | `PythonPhysicalCodec` expr | `DFPYPE1` | +//! | User FFI extension codec | user-chosen | +//! | Default codec | (none) | +//! +//! Downstream FFI codecs should pick non-colliding prefixes (use a +//! `DF` namespace plus a crate-specific suffix). The codec +//! implementations in this module currently delegate every method to +//! `inner`; the encoder/decoder hooks for each kind are added as the +//! corresponding Python-side type becomes serializable. + +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use datafusion::common::{Result, TableReference}; +use datafusion::datasource::TableProvider; +use datafusion::datasource::file_format::FileFormatFactory; +use datafusion::execution::TaskContext; +use datafusion::logical_expr::{AggregateUDF, Extension, LogicalPlan, ScalarUDF, WindowUDF}; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_proto::logical_plan::{DefaultLogicalExtensionCodec, LogicalExtensionCodec}; +use datafusion_proto::physical_plan::{DefaultPhysicalExtensionCodec, PhysicalExtensionCodec}; + +/// Wire-format prefix that tags a `fun_definition` payload as an +/// inlined Python scalar UDF (cloudpickled tuple of name, callable, +/// input schema, return field, volatility). Defined once here so +/// the encoder and decoder cannot drift. +#[allow(dead_code)] +pub(crate) const PY_SCALAR_UDF_MAGIC: &[u8] = b"DFPYUDF1"; Review Comment: We are allowing dead code so that we have this ready to go as the preferred format. The immediate next PR will use this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
