davidzollo opened a new issue, #10460:
URL: https://github.com/apache/seatunnel/issues/10460
## Is your feature request related to a problem? Please describe
Currently, the SQL UDF mechanism (`ZetaUDF`) in SeaTunnel Transform is
stateless and isolated. It handles data purely based on the input arguments
passed to the function. However, in complex production scenarios, this design
has several limitations:
1. **Lack of Context Access**: UDFs cannot access row-level metadata such
as the source `database`, `table` name, `rowKind` (INSERT/UPDATE/DELETE), or
other fields in the row that are not explicitly passed as arguments.
2. **No Lifecycle Management**: There are no `open()` or `close()` hooks.
This makes it impossible to initialize expensive resources (e.g., KMS clients,
database connections, cache loaders) once and reuse them. Users are forced to
initialize resources per row or use static singletons, which are hard to manage
and test.
3. **Limited Stateful Capabilities**: Implementing logic that depends on
the execution context (e.g., encrypting data using a key derived from the table
name) is currently impossible.
## Describe the solution you'd like
I propose enhancing the `ZetaUDF` interface and the SQL execution engine to
support **Context-Aware UDFs** and **Lifecycle Management**, while maintaining
**100% backward compatibility** with existing UDFs.
### 1. `ZetaUDF` Interface Extension
Introduce new `default` methods to the `ZetaUDF` interface. Existing UDFs do
not need to change.
```java
public interface ZetaUDF extends Serializable {
// ... existing methods ...
/**
* Indicate whether this UDF requires row-level context.
* Default returns false for backward compatibility.
*/
default boolean requiresContext() {
return false;
}
/**
* Evaluate the function with context.
* Default implementation falls back to evaluate(args).
*/
default Object evaluateWithContext(List<Object> args, ZetaUDFContext
context) {
return evaluate(args);
}
/**
* Initialize resources. Called once during engine startup.
*/
default void open() throws Exception {}
/**
* Release resources. Called once during engine shutdown.
*/
default void close() {}
}
```
### 2. Introduce `ZetaUDFContext`
A lightweight, mutable context class that provides access to the runtime
environment.
* **Metadata Access**: `getRawTableId()`, `getDatabase()`, `getTable()`,
`getRowKind()`.
* **Data Access**: `getAllFields()` (read-only access to the full row).
### 3. Engine Enhancements
* **Lifecycle Integration**: The `ZetaSQLEngine` will call `open()` on all
registered UDFs during preparation and `close()` during shutdown.
* **Context Injection**: The engine will maintain a shared
`ZetaUDFContext` instance and update it for each row (using a zero-allocation,
mutable pattern) before executing UDFs.
## Describe alternatives you've considered
* **ThreadLocal Context**: Considered using `ThreadLocal` to pass context
implicitly.
* *Rejected*: Hard to manage lifecycle in different execution
environments (Spark/Flink/SeaTunnel Engine), potential for memory leaks, and
less explicit API contract.
* **Refactoring `evaluate` Signature**: Changing `evaluate(List<Object>
args)` to accept context.
* *Rejected*: This would break all existing Custom UDFs. The proposed
`default` method approach ensures seamless backward compatibility.
## Additional context
### Performance Considerations
The implementation focuses on **Zero-Copy** and **Object Reuse**:
* The `ZetaUDFContext` instance is created once and reused.
* Updating the context for each row involves only reference assignments
(`context.update(fields, row)`).
* No new objects are allocated per row processing.
* Expected performance impact is negligible (< 2%) for UDFs that do not
use the context features.
### Example Usage (Encryption)
```java
public class EncryptUDF implements ZetaUDF {
private CryptoClient client;
@Override
public void open() {
this.client = new CryptoClient(); // Init once
}
@Override
public boolean requiresContext() { return true; }
@Override
public Object evaluateWithContext(List<Object> args, ZetaUDFContext
context) {
String tableId = context.getRawTableId();
// Use table name as part of the encryption key derivation
return client.encrypt(args.get(0), tableId);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]