andygrove commented on issue #173:
URL: 
https://github.com/apache/datafusion-ballista/issues/173#issuecomment-3785078187

   ## Implementation Analysis
   
   I asked Claude Code to analyze what would be involved in adding Python UDF 
support to Ballista. Here are the detailed findings:
   
   ---
   
   ### Current State
   
   The infrastructure partially exists but isn't fully wired up:
   - `python/src/codec.rs` has `try_encode_udf`/`try_decode_udf` stubs that 
mention cloudpickle but just delegate to the inner codec
   - `cloudpickle` is already a dependency in `python/pyproject.toml`
   - DataFusion Python supports UDFs locally via `@udf` decorator
   - `ballista/core/src/registry.rs` has `BallistaFunctionRegistry` but only 
supports built-in functions
   
   ### Implementation Approach
   
   #### Option 1: Arrow-Optimized UDFs (Recommended)
   
   Based on [PySpark's approach in Spark 
3.5+](https://www.databricks.com/blog/arrow-optimized-python-udfs-apache-sparktm-35),
 use Arrow for serialization instead of pure cloudpickle:
   
   ```
   ┌─────────────────┐     Arrow IPC      ┌─────────────────┐
   │  Python Client  │ ──────────────────▶│    Executor     │
   │                 │                    │                 │
   │  UDF + Data     │◀────────────────── │  Python Worker  │
   │                 │     Arrow IPC      │                 │
   └─────────────────┘                    └─────────────────┘
   ```
   
   **Benefits**: 1.6-1.9x faster than cloudpickle, standardized type coercion
   
   #### Option 2: Cloudpickle Serialization (Simpler)
   
   Serialize the entire UDF with cloudpickle and send with the query plan.
   
   ---
   
   ### Implementation Steps
   
   #### Phase 1: Protobuf & Codec Changes
   
   **1. Extend `ballista.proto`:**
   ```protobuf
   message PythonUDF {
     string name = 1;
     bytes pickled_function = 2;  // cloudpickle.dumps(func)
     bytes return_type = 3;       // Arrow IPC schema
     repeated bytes argument_types = 4;
     bool arrow_optimized = 5;    // Use Arrow serialization
   }
   
   message SessionState {
     // ... existing fields
     repeated PythonUDF python_udfs = 10;
   }
   ```
   
   **2. Implement codec in `python/src/codec.rs`:**
   ```rust
   fn try_encode_udf(&self, udf: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
       Python::with_gil(|py| {
           let cloudpickle = py.import("cloudpickle")?;
           let pickled = cloudpickle.call_method1("dumps", (udf.inner(),))?;
           buf.extend_from_slice(pickled.extract::<&[u8]>()?);
           Ok(())
       })
   }
   ```
   
   #### Phase 2: Executor Changes
   
   **1. Add Python runtime to executor:**
   ```rust
   // In executor_server.rs
   struct ExecutorState {
       // ... existing
       python_udfs: HashMap<String, PyObject>,  // Deserialized UDFs
   }
   
   fn register_python_udf(&mut self, udf_def: &PythonUDF) -> Result<()> {
       Python::with_gil(|py| {
           let cloudpickle = py.import("cloudpickle")?;
           let func = cloudpickle.call_method1("loads", 
(&udf_def.pickled_function,))?;
           self.python_udfs.insert(udf_def.name.clone(), func.into());
           Ok(())
       })
   }
   ```
   
   **2. Create Arrow-based UDF executor:**
   ```rust
   fn execute_python_udf(
       &self,
       name: &str,
       args: &[ArrayRef],
   ) -> Result<ArrayRef> {
       Python::with_gil(|py| {
           let func = self.python_udfs.get(name)?;
           
           // Convert Arrow arrays to PyArrow
           let py_args: Vec<PyObject> = args.iter()
               .map(|arr| arrow_to_pyarrow(py, arr))
               .collect()?;
           
           // Call Python function
           let result = func.call1(py, PyTuple::new(py, &py_args))?;
           
           // Convert back to Arrow
           pyarrow_to_arrow(py, result)
       })
   }
   ```
   
   #### Phase 3: Python API
   
   **1. Add registration method:**
   ```python
   # ballista/__init__.py
   class BallistaSessionContext:
       def register_udf(self, func, name=None, return_type=None):
           """Register a Python UDF for distributed execution."""
           import cloudpickle
           
           udf_def = {
               'name': name or func.__name__,
               'pickled': cloudpickle.dumps(func),
               'return_type': return_type,
           }
           self._registered_udfs[udf_def['name']] = udf_def
           
           # Also register locally for planning
           self._inner.register_udf(func, name, return_type)
   ```
   
   **2. Example usage:**
   ```python
   from ballista import BallistaSessionContext
   from datafusion import udf
   import pyarrow as pa
   
   @udf(return_type=pa.float64(), input_types=[pa.float64()])
   def celsius_to_fahrenheit(c):
       return c * 9/5 + 32
   
   ctx = BallistaSessionContext("df://localhost:50050")
   ctx.register_udf(celsius_to_fahrenheit)
   
   df = ctx.sql("SELECT celsius_to_fahrenheit(temp) FROM weather")
   df.show()
   ```
   
   ---
   
   ### Technical Challenges
   
   | Challenge | Solution |
   |-----------|----------|
   | **UDF dependencies** | Require same packages on all executors, or use 
container images |
   | **Python version mismatch** | Validate version at registration, fail fast |
   | **Pickle security** | Optional allowlist of permitted modules |
   | **GIL contention** | Use separate Python process per executor, communicate 
via Arrow IPC |
   | **Large UDF size** | Cache UDFs by hash, only send once per session |
   
   ---
   
   ### Architecture Options
   
   #### Option A: Embedded Python (Simpler)
   - Python interpreter embedded in Rust executor via PyO3
   - Lower latency, simpler deployment
   - GIL limits parallelism within single executor
   
   #### Option B: Sidecar Process (More Scalable)
   ```
   ┌─────────────┐    Arrow IPC    ┌─────────────┐
   │  Executor   │◀──────────────▶│  Python     │
   │  (Rust)     │    Unix Socket  │  Worker     │
   └─────────────┘                 └─────────────┘
   ```
   - Separate Python process per executor
   - Better parallelism, isolated failures
   - More complex deployment
   
   #### Option C: Arrow Flight UDF Service
   - Dedicated UDF execution service
   - Executors call service via Flight RPC
   - Best for GPU/specialized hardware
   
   ---
   
   ### Key Files to Modify
   
   | Component | File Path | Purpose |
   |-----------|-----------|---------|
   | Function Registry | `ballista/core/src/registry.rs` | Add custom UDF 
storage |
   | Codec (Python) | `python/src/codec.rs` | Implement cloudpickle 
serialization |
   | Task Definition | `ballista/core/src/serde/scheduler/from_proto.rs` | 
Deserialize UDFs |
   | Executor | `ballista/executor/src/executor_server.rs` | Execute Python 
UDFs |
   | Protobuf | `ballista/core/proto/ballista.proto` | Add UDF message types |
   | Python API | `python/python/ballista/__init__.py` | Add `register_udf()` |
   
   ---
   
   ### Recommended Implementation Path
   
   1. **Phase 1** (MVP): Standalone mode only, embedded Python, cloudpickle
   2. **Phase 2**: Distributed mode with UDF caching
   3. **Phase 3**: Arrow-optimized serialization for performance
   4. **Phase 4**: Sidecar process option for production workloads
   
   ### Estimated Effort
   
   | Component | Effort |
   |-----------|--------|
   | Protobuf changes | 1-2 days |
   | Codec implementation | 3-5 days |
   | Executor integration | 5-7 days |
   | Python API | 2-3 days |
   | Testing & docs | 3-5 days |
   | **Total MVP** | **2-3 weeks** |
   
   ---
   
   ### References
   
   - [Arrow-optimized Python UDFs in Apache Spark 
3.5](https://www.databricks.com/blog/arrow-optimized-python-udfs-apache-sparktm-35)
 - PySpark's approach
   - [PySpark UDF 
documentation](https://spark.apache.org/docs/latest/api/python/user_guide/udfandudtf.html)
   - [LakeSail Sail](https://github.com/lakehq/sail) - DataFusion-based project 
with Python UDF support via Spark Connect


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to