kosiew opened a new pull request, #64960:
URL: https://github.com/apache/airflow/pull/64960
### Summary
Validate `databricks_retry_args` / `retry_args` for deferrable Databricks
operators, sensors, and triggers to ensure they are serialization-safe before
crossing the trigger boundary. Non-serializable values (e.g., Tenacity strategy
callables) now raise a clear `ValueError` at initialization/execution time.
---
### Motivation / Problem
Deferrable Databricks execution forwards retry configuration through the
trigger serialization boundary. Non-serializable objects (such as
`tenacity.wait_incrementing(...)` or `tenacity.stop_after_attempt(...)`) cannot
be serialized by Airflow’s serde layer and fail at runtime in the triggerer,
making debugging difficult.
---
### What this PR does
* Introduces `validate_deferrable_databricks_retry_args` utility to assert
JSON/serde-serializability of retry args.
* Invokes validation in:
* `DatabricksExecutionTrigger`
* `DatabricksSQLStatementExecutionTrigger`
* Ensures deferrable operator/sensor paths fail fast when invalid retry args
are provided.
* Adds comprehensive unit tests covering:
* Operators rejecting non-serializable retry args in deferrable mode
* Sensors rejecting non-serializable retry args in deferrable mode
* Trigger initialization rejecting invalid retry args
---
### Behavior change
**Before:**
* Invalid retry args fail later in triggerer serialization with unclear
errors.
**After:**
* Immediate `ValueError` with actionable message:
* "Use JSON-serializable values, remove callable retry strategies, or
disable deferrable mode."
---
### Example of unsupported config
```python
from tenacity import wait_incrementing
DatabricksSubmitRunOperator(
task_id="example",
deferrable=True,
databricks_retry_args={"wait": wait_incrementing(1, 1, 3)}, # ❌ now
rejected
)
```
---
### Example of supported config
```python
DatabricksSubmitRunOperator(
task_id="example",
deferrable=True,
databricks_retry_args={"max_retries": 3, "delay": 5}, # ✅
JSON-serializable
)
```
---
### Implementation details
* New module: `providers/databricks/utils/retry.py`
* Uses Airflow serde (`airflow.sdk.serde.serialize`) to validate
compatibility
* Catches `AttributeError`, `RecursionError`, `TypeError` and rethrows as
`ValueError`
---
### Tests
* Added shared test utilities for invalid retry args
* Parametrized tests using Tenacity objects:
* `wait_incrementing`
* `stop_after_attempt`
* Coverage includes:
* Operator execution (deferrable)
* Sensor execution (deferrable)
* Trigger initialization
---
### Backward compatibility
* No impact for non-deferrable usage
* Only affects misconfigured deferrable retry args
* Valid JSON-compatible retry configurations remain unaffected
---
### Documentation
* No user-facing docs required (error message is self-explanatory)
---
### Checklist
* [x] Tests added/updated
* [x] Backward compatibility considered
* [x] Clear error messaging
---
* Part of #64609
---
### Was generative AI tooling used to co-author this PR?
* [x] Yes (please specify the tool below)
Codex
Github Copilot
ChatGPT
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]