DanielCarter-stack commented on PR #10489:
URL: https://github.com/apache/seatunnel/pull/10489#issuecomment-3895119958
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10489", "part": 1,
"total": 1} -->
### Issue 1: UDF state not restored after Checkpoint
**Location**:
`seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-udf/src/main/java/org/apache/seatunnel/e2e/transform/udf/EncryptUDF.java:32`
```java
private transient CryptoClient client;
@Override
public void open() {
this.client = new CryptoClient();
}
@Override
public Object evaluateWithContext(List<Object> args, ZetaUDFContext context)
{
if (client == null) { // Triggered after Checkpoint recovery
throw new IllegalStateException("open() was not called before
evaluateWithContext()");
}
// ...
}
```
**Related Context**:
- Parent class/interface: `ZetaUDF.java:70-73`
- Caller: `ZetaSQLEngine.java:108-120` (`openUDFs`)
- Caller: `ZetaSQLFunction.java:668-670` (`evaluateWithContext`)
**Problem Description**:
When SeaTunnel Task restores from Checkpoint:
1. `SQLTransform.sqlEngine` field is `transient`, and is `null` during
restoration
2. `transformRow()` calling `tryOpen()` will re-initialize `sqlEngine`
3. But if Checkpoint occurs after `open()`, the UDF instance in `udfList` is
deserialized during restoration
4. The UDF instance's `transient` field (such as `client`) becomes `null`
5. The next call to `evaluateWithContext()` will throw
`IllegalStateException`
**Potential Risks**:
- In production environment when Task recovers from failure, all UDFs using
`transient` resources will fail
- Impacts SeaTunnel's high availability feature
- Error message is not user-friendly (`open() was not called`)
**Impact Scope**:
- Direct impact: All UDF implementations using `transient` field
- Indirect impact: Data processing jobs dependent on UDFs
- Affected area: SQL Transform (single Transform)
**Severity**: **MAJOR**
**Improvement Suggestions**:
**Solution 1: Let UDF implement `CheckpointListener`**
```java
// ZetaUDF.java
public interface ZetaUDF extends Serializable, CheckpointListener {
// ...
default void notifyCheckpointComplete(long checkpointId) {}
default void notifyCheckpointRestore(long checkpointId) {}
}
```
**Solution 2: Re-initialize at `SQLTransform` level**
```java
// SQLTransform.java
@Override
public void open() {
if (sqlEngine == null) {
sqlEngine = SQLEngineFactory.getSQLEngine(engineType);
// ...
}
// Reinitialize UDF each time reopened
sqlEngine.reopenUDFs(); // This method needs to be added
}
```
**Solution 3: Document UDF state management limitations**
Explicitly state in documentation:
- UDFs should not store state in `transient` fields
- For state management, external state backends should be used
**Rationale**:
- Checkpoint/Restore is a core feature of distributed computing frameworks
- Current implementation silently fails after restoration
- Users cannot work around through code (unless avoiding use of `transient`)
---
### Issue 2: ZetaUDFContext lacks null safety checks
**Location**:
`seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java:108-110`
```java
public Object[] getAllFields() {
return allFields;
}
```
**Related Context**:
- Caller 1: `ZetaSQLFunction.java:247-252` (`updateUDFContext`)
- Caller 2: UDF implementations (user code)
**Problem Description**:
`getAllFields()` returns a direct reference to the internal array, posing
risks:
1. UDF implementations may modify array contents
2. If `fields` parameter is `null`, returns `EMPTY_FIELDS` (although it's
final, the semantics are unclear)
3. No JavaDoc explains whether the return value is mutable
**Potential Risks**:
- UDF accidentally modifies context array
- Multiple calls return different content (violates immutability
expectations)
- State inconsistency in concurrent scenarios
**Impact Scope**:
- Direct impact: UDFs using `context.getAllFields()`
- Indirect impact: UDFs dependent on `getAllFields()` immutability
- Affected area: SQL Transform (single Transform)
**Severity**: **MINOR**
**Improvement Suggestions**:
```java
// ZetaUDFContext.java
@Nullable
public Object[] getAllFields() {
return allFields == EMPTY_FIELDS ? null : allFields.clone();
}
/**
* Get all fields of the current row.
* Note: The returned array is a copy and modifying it will not affect the
context.
*
* @return a copy of all fields, or null if the row has no fields
*/
@Nullable
public Object[] getAllFields() {
return allFields.length == 0 ? null : Arrays.copyOf(allFields,
allFields.length);
}
```
**Rationale**:
- Defensive copy prevents accidental modification
- Clear JavaDoc explains behavior
- Returning `null` is more Java-conventional than `EMPTY_FIELDS`
---
### Issue 3: EncryptUDF unnecessary throw UnsupportedOperationException
**Location**:
`seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-udf/src/main/java/org/apache/seatunnel/e2e/transform/udf/EncryptUDF.java:55-57`
```java
@Override
public Object evaluate(List<Object> args) {
throw new UnsupportedOperationException("ENCRYPT should be called with
context");
}
```
**Related Context**:
- Interface definition: `ZetaUDF.java:47`
- Caller: `ZetaSQLFunction.java:666-673`
**Problem Description**:
Although this UDF sets `requiresContext() = true`, it still implements
`evaluate()` and throws exception:
1. Caller will call `evaluate()` when `udfContext == null`
2. Exception message is not user-friendly
3. Violates the principle that "interface default implementations should be
callable"
**Potential Risks**:
- Throws confusing exceptions in edge cases (`udfContext == null`)
- If code paths bypass `requiresContext()` check, it will cause runtime
errors
- Inconsistent with interface default implementation (`return
evaluate(args)`)
**Impact Scope**:
- Direct impact: `EncryptUDF` (example code)
- Indirect impact: User UDFs imitating this pattern
- Affected area: SQL Transform (single Transform)
**Severity**: **MINOR**
**Improvement Suggestions**:
```java
// EncryptUDF.java
@Override
public Object evaluate(List<Object> args) {
// Fallback to context-less evaluation if needed
Object value = args.get(0);
if (value == null) {
return null;
}
// Simplified implementation: no tableId-related encryption
return "ENC(-1):" + value;
}
@Override
public Object evaluateWithContext(List<Object> args, ZetaUDFContext context)
{
if (client == null) {
throw new IllegalStateException("open() was not called before
evaluateWithContext()");
}
Object value = args.get(0);
if (value == null) {
return null;
}
String tableId = context.getRawTableId();
return client.encrypt(value, tableId);
}
```
**Rationale**:
- Provides reasonable fallback behavior
- Maintains interface consistency
- Better user experience (still works in edge cases)
---
### Issue 4: Missing unit tests covering core logic
**Location**:
`seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java:61-83`
**Related Context**:
- Test directory:
`seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/`
**Problem Description**:
The newly added `ZetaUDFContext` in this PR lacks unit tests, not covering:
1. Accessing `getDatabase()/getSchema()/getTable()` when `tableId = null`
2. Exception handling for `tableId = "invalid"` format
3. Storage and re-throwing of `tablePathParseException`
4. Short-circuit logic for multiple calls to `resolveTablePathIfNeeded()`
**Potential Risks**:
- Behavior in edge cases is unverified
- Refactoring may introduce regressions
- Code reviewers cannot understand expected behavior through tests
**Impact Scope**:
- Direct impact: Maintainability of `ZetaUDFContext`
- Indirect impact: Future developers modifying this class
- Affected area: SQL Transform (single Transform)
**Severity**: **MINOR**
**Improvement Suggestions**:
Add unit test `ZetaUDFContextTest.java`:
```java
public class ZetaUDFContextTest {
@Test
public void testNullTableId() {
ZetaUDFContext context = new ZetaUDFContext();
SeaTunnelRow row = new SeaTunnelRow(new Object[0]);
row.setTableId(null);
context.update(row.getFields(), row);
assertNull(context.getRawTableId());
assertNull(context.getDatabase()); // Should return null, not throw
exception
assertNull(context.getSchema());
assertNull(context.getTable());
assertEquals(RowKind.INSERT, context.getRowKind());
}
@Test
public void testInvalidTableIdFormat() {
ZetaUDFContext context = new ZetaUDFContext();
SeaTunnelRow row = new SeaTunnelRow(new Object[0]);
row.setTableId("invalid.format.with.too.many.parts");
context.update(row.getFields(), row);
// First access should throw exception
assertThrows(IllegalArgumentException.class, () ->
context.getDatabase());
// Subsequent access should rethrow same exception
IllegalArgumentException ex =
assertThrows(IllegalArgumentException.class,
() -> context.getTable());
assertTrue(ex.getMessage().contains("Cannot get split"));
}
@Test
public void testValidTableIdFormats() {
// Test "db", "db.table", "db.schema.table" formats
// ...
}
@Test
public void testTablePathResolutionLazy() {
ZetaUDFContext context = new ZetaUDFContext();
SeaTunnelRow row = new SeaTunnelRow(new Object[0]);
row.setTableId("db.table");
context.update(row.getFields(), row);
// tablePathResolved should be false before getDatabase() is accessed
// ...
}
}
```
**Rationale**:
- Unit tests are insurance for code quality
- Verifies edge cases and exception handling
- Provides documentation for future maintenance
- Apache top-level projects should have adequate test coverage
---
### Issue 5: closeUDFs may swallow exceptions during openUDFs failure
rollback
**Location**:
`seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java:114-120`
```java
try {
udf.open();
} catch (Exception e) {
closeUDFs(i - 1); // Try to close already opened UDF
throw new TransformException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
String.format("Open udf %s failed: %s", udf.functionName(),
e.getMessage()));
}
```
**Related Context**:
- Caller: `ZetaSQLEngine.java:98` (`init()` method)
- Related method: `ZetaSQLEngine.java:351-359` (`closeUDFs`)
**Problem Description**:
When `udf.open()` throws an exception:
1. Calls `closeUDFs(i - 1)` to close previously opened UDFs in reverse order
2. `close()` exceptions inside `closeUDFs` are swallowed (only logs `warn`)
3. If `close()` of multiple UDFs also fails, only the last exception will be
retained in `TransformException`
4. Other `close()` failure exceptions will be swallowed
**Potential Risks**:
- Resource cleanup failures are silently swallowed
- Users cannot diagnose why resources are not released
- May lead to file handle/connection leaks
**Impact Scope**:
- Direct impact: Error diagnosis when `open()` fails
- Indirect impact: Resource management and troubleshooting
- Affected area: SQL Transform (single Transform)
**Severity**: **MINOR**
**Improvement Suggestions**:
```java
// ZetaSQLEngine.java
private void openUDFs() {
List<Exception> closeErrors = new ArrayList<>();
for (int i = 0; i < udfList.size(); i++) {
ZetaUDF udf = udfList.get(i);
try {
udf.open();
} catch (Exception e) {
// Collect all exceptions when closing
closeUDFs(i - 1, closeErrors);
// Build detailed error message
String errorMsg = String.format(
"Open udf %s failed: %s. UDFs [%s] were closed, with %d
close errors",
udf.functionName(),
e.getMessage(),
describeUdfRange(0, i - 1),
closeErrors.size());
throw new TransformException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
errorMsg);
}
}
}
private void closeUDFs(int lastIndex, List<Exception> errors) {
for (int i = lastIndex; i >= 0; i--) {
try {
udfList.get(i).close();
} catch (Exception e) {
log.error("Close udf {} failed during rollback",
udfList.get(i).functionName(), e);
if (errors != null) {
errors.add(e);
}
}
}
}
private String describeUdfRange(int start, int end) {
return udfList.subList(start, end + 1).stream()
.map(ZetaUDF::functionName)
.collect(Collectors.joining(", "));
}
```
**Rationale**:
- Preserve all exception information for diagnosis
- Detailed error messages help users understand what happened
- Use `log.error` instead of `log.warn` (error rather than warning)
- `closeUDFs` maintains backward compatibility (`errors` parameter can be
`null`)
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]