andygrove commented on issue #173: URL: https://github.com/apache/datafusion-ballista/issues/173#issuecomment-3785078187
## Implementation Analysis I asked Claude Code to analyze what would be involved in adding Python UDF support to Ballista. Here are the detailed findings: --- ### Current State The infrastructure partially exists but isn't fully wired up: - `python/src/codec.rs` has `try_encode_udf`/`try_decode_udf` stubs that mention cloudpickle but just delegate to the inner codec - `cloudpickle` is already a dependency in `python/pyproject.toml` - DataFusion Python supports UDFs locally via `@udf` decorator - `ballista/core/src/registry.rs` has `BallistaFunctionRegistry` but only supports built-in functions ### Implementation Approach #### Option 1: Arrow-Optimized UDFs (Recommended) Based on [PySpark's approach in Spark 3.5+](https://www.databricks.com/blog/arrow-optimized-python-udfs-apache-sparktm-35), use Arrow for serialization instead of pure cloudpickle: ``` ┌─────────────────┐ Arrow IPC ┌─────────────────┐ │ Python Client │ ──────────────────▶│ Executor │ │ │ │ │ │ UDF + Data │◀────────────────── │ Python Worker │ │ │ Arrow IPC │ │ └─────────────────┘ └─────────────────┘ ``` **Benefits**: 1.6-1.9x faster than cloudpickle, standardized type coercion #### Option 2: Cloudpickle Serialization (Simpler) Serialize the entire UDF with cloudpickle and send with the query plan. --- ### Implementation Steps #### Phase 1: Protobuf & Codec Changes **1. Extend `ballista.proto`:** ```protobuf message PythonUDF { string name = 1; bytes pickled_function = 2; // cloudpickle.dumps(func) bytes return_type = 3; // Arrow IPC schema repeated bytes argument_types = 4; bool arrow_optimized = 5; // Use Arrow serialization } message SessionState { // ... existing fields repeated PythonUDF python_udfs = 10; } ``` **2. Implement codec in `python/src/codec.rs`:** ```rust fn try_encode_udf(&self, udf: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> { Python::with_gil(|py| { let cloudpickle = py.import("cloudpickle")?; let pickled = cloudpickle.call_method1("dumps", (udf.inner(),))?; buf.extend_from_slice(pickled.extract::<&[u8]>()?); Ok(()) }) } ``` #### Phase 2: Executor Changes **1. Add Python runtime to executor:** ```rust // In executor_server.rs struct ExecutorState { // ... existing python_udfs: HashMap<String, PyObject>, // Deserialized UDFs } fn register_python_udf(&mut self, udf_def: &PythonUDF) -> Result<()> { Python::with_gil(|py| { let cloudpickle = py.import("cloudpickle")?; let func = cloudpickle.call_method1("loads", (&udf_def.pickled_function,))?; self.python_udfs.insert(udf_def.name.clone(), func.into()); Ok(()) }) } ``` **2. Create Arrow-based UDF executor:** ```rust fn execute_python_udf( &self, name: &str, args: &[ArrayRef], ) -> Result<ArrayRef> { Python::with_gil(|py| { let func = self.python_udfs.get(name)?; // Convert Arrow arrays to PyArrow let py_args: Vec<PyObject> = args.iter() .map(|arr| arrow_to_pyarrow(py, arr)) .collect()?; // Call Python function let result = func.call1(py, PyTuple::new(py, &py_args))?; // Convert back to Arrow pyarrow_to_arrow(py, result) }) } ``` #### Phase 3: Python API **1. Add registration method:** ```python # ballista/__init__.py class BallistaSessionContext: def register_udf(self, func, name=None, return_type=None): """Register a Python UDF for distributed execution.""" import cloudpickle udf_def = { 'name': name or func.__name__, 'pickled': cloudpickle.dumps(func), 'return_type': return_type, } self._registered_udfs[udf_def['name']] = udf_def # Also register locally for planning self._inner.register_udf(func, name, return_type) ``` **2. Example usage:** ```python from ballista import BallistaSessionContext from datafusion import udf import pyarrow as pa @udf(return_type=pa.float64(), input_types=[pa.float64()]) def celsius_to_fahrenheit(c): return c * 9/5 + 32 ctx = BallistaSessionContext("df://localhost:50050") ctx.register_udf(celsius_to_fahrenheit) df = ctx.sql("SELECT celsius_to_fahrenheit(temp) FROM weather") df.show() ``` --- ### Technical Challenges | Challenge | Solution | |-----------|----------| | **UDF dependencies** | Require same packages on all executors, or use container images | | **Python version mismatch** | Validate version at registration, fail fast | | **Pickle security** | Optional allowlist of permitted modules | | **GIL contention** | Use separate Python process per executor, communicate via Arrow IPC | | **Large UDF size** | Cache UDFs by hash, only send once per session | --- ### Architecture Options #### Option A: Embedded Python (Simpler) - Python interpreter embedded in Rust executor via PyO3 - Lower latency, simpler deployment - GIL limits parallelism within single executor #### Option B: Sidecar Process (More Scalable) ``` ┌─────────────┐ Arrow IPC ┌─────────────┐ │ Executor │◀──────────────▶│ Python │ │ (Rust) │ Unix Socket │ Worker │ └─────────────┘ └─────────────┘ ``` - Separate Python process per executor - Better parallelism, isolated failures - More complex deployment #### Option C: Arrow Flight UDF Service - Dedicated UDF execution service - Executors call service via Flight RPC - Best for GPU/specialized hardware --- ### Key Files to Modify | Component | File Path | Purpose | |-----------|-----------|---------| | Function Registry | `ballista/core/src/registry.rs` | Add custom UDF storage | | Codec (Python) | `python/src/codec.rs` | Implement cloudpickle serialization | | Task Definition | `ballista/core/src/serde/scheduler/from_proto.rs` | Deserialize UDFs | | Executor | `ballista/executor/src/executor_server.rs` | Execute Python UDFs | | Protobuf | `ballista/core/proto/ballista.proto` | Add UDF message types | | Python API | `python/python/ballista/__init__.py` | Add `register_udf()` | --- ### Recommended Implementation Path 1. **Phase 1** (MVP): Standalone mode only, embedded Python, cloudpickle 2. **Phase 2**: Distributed mode with UDF caching 3. **Phase 3**: Arrow-optimized serialization for performance 4. **Phase 4**: Sidecar process option for production workloads ### Estimated Effort | Component | Effort | |-----------|--------| | Protobuf changes | 1-2 days | | Codec implementation | 3-5 days | | Executor integration | 5-7 days | | Python API | 2-3 days | | Testing & docs | 3-5 days | | **Total MVP** | **2-3 weeks** | --- ### References - [Arrow-optimized Python UDFs in Apache Spark 3.5](https://www.databricks.com/blog/arrow-optimized-python-udfs-apache-sparktm-35) - PySpark's approach - [PySpark UDF documentation](https://spark.apache.org/docs/latest/api/python/user_guide/udfandudtf.html) - [LakeSail Sail](https://github.com/lakehq/sail) - DataFusion-based project with Python UDF support via Spark Connect -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
