davidzollo opened a new issue, #10460:
URL: https://github.com/apache/seatunnel/issues/10460

   ## Is your feature request related to a problem? Please describe
   
   Currently, the SQL UDF mechanism (`ZetaUDF`) in SeaTunnel Transform is 
stateless and isolated. It handles data purely based on the input arguments 
passed to the function. However, in complex production scenarios, this design 
has several limitations:
   
   1.  **Lack of Context Access**: UDFs cannot access row-level metadata such 
as the source `database`, `table` name, `rowKind` (INSERT/UPDATE/DELETE), or 
other fields in the row that are not explicitly passed as arguments.
   2.  **No Lifecycle Management**: There are no `open()` or `close()` hooks. 
This makes it impossible to initialize expensive resources (e.g., KMS clients, 
database connections, cache loaders) once and reuse them. Users are forced to 
initialize resources per row or use static singletons, which are hard to manage 
and test.
   3.  **Limited Stateful Capabilities**: Implementing logic that depends on 
the execution context (e.g., encrypting data using a key derived from the table 
name) is currently impossible.
   
   ## Describe the solution you'd like
   
   I propose enhancing the `ZetaUDF` interface and the SQL execution engine to 
support **Context-Aware UDFs** and **Lifecycle Management**, while maintaining 
**100% backward compatibility** with existing UDFs.
   
   ### 1. `ZetaUDF` Interface Extension
   
   Introduce new `default` methods to the `ZetaUDF` interface. Existing UDFs do 
not need to change.
   
   ```java
   public interface ZetaUDF extends Serializable {
       // ... existing methods ...
   
       /**
        * Indicate whether this UDF requires row-level context.
        * Default returns false for backward compatibility.
        */
       default boolean requiresContext() {
           return false;
       }
   
       /**
        * Evaluate the function with context.
        * Default implementation falls back to evaluate(args).
        */
       default Object evaluateWithContext(List<Object> args, ZetaUDFContext 
context) {
           return evaluate(args);
       }
   
       /**
        * Initialize resources. Called once during engine startup.
        */
       default void open() throws Exception {}
   
       /**
        * Release resources. Called once during engine shutdown.
        */
       default void close() {}
   }
   ```
   
   ### 2. Introduce `ZetaUDFContext`
   
   A lightweight, mutable context class that provides access to the runtime 
environment.
   
   *   **Metadata Access**: `getRawTableId()`, `getDatabase()`, `getTable()`, 
`getRowKind()`.
   *   **Data Access**: `getAllFields()` (read-only access to the full row).
   
   ### 3. Engine Enhancements
   
   *   **Lifecycle Integration**: The `ZetaSQLEngine` will call `open()` on all 
registered UDFs during preparation and `close()` during shutdown.
   *   **Context Injection**: The engine will maintain a shared 
`ZetaUDFContext` instance and update it for each row (using a zero-allocation, 
mutable pattern) before executing UDFs.
   
   ## Describe alternatives you've considered
   
   *   **ThreadLocal Context**: Considered using `ThreadLocal` to pass context 
implicitly.
       *   *Rejected*: Hard to manage lifecycle in different execution 
environments (Spark/Flink/SeaTunnel Engine), potential for memory leaks, and 
less explicit API contract.
   *   **Refactoring `evaluate` Signature**: Changing `evaluate(List<Object> 
args)` to accept context.
       *   *Rejected*: This would break all existing Custom UDFs. The proposed 
`default` method approach ensures seamless backward compatibility.
   
   ## Additional context
   
   ### Performance Considerations
   The implementation focuses on **Zero-Copy** and **Object Reuse**:
   *   The `ZetaUDFContext` instance is created once and reused.
   *   Updating the context for each row involves only reference assignments 
(`context.update(fields, row)`).
   *   No new objects are allocated per row processing.
   *   Expected performance impact is negligible (< 2%) for UDFs that do not 
use the context features.
   
   ### Example Usage (Encryption)
   
   ```java
   public class EncryptUDF implements ZetaUDF {
       private CryptoClient client;
   
       @Override
       public void open() {
           this.client = new CryptoClient(); // Init once
       }
   
       @Override
       public boolean requiresContext() { return true; }
   
       @Override
       public Object evaluateWithContext(List<Object> args, ZetaUDFContext 
context) {
           String tableId = context.getRawTableId();
           // Use table name as part of the encryption key derivation
           return client.encrypt(args.get(0), tableId);
       }
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to