jayachandrakasarla opened a new pull request, #68542:
URL: https://github.com/apache/airflow/pull/68542

   Closes #68537  
   
   As per the issue, `Variable.set()` and `Variable.delete()` in the Task SDK 
were silently ignoring AirflowRuntimeError, logging the exception but not 
re-raising, causing tasks to report success even when a variable write was 
rejected by the Execution API. 
   
   Added `raise` to both except blocks so errors now propagate and fail the 
task, consistent with the existing behavior of `Variable.get()`.
   
   Also added the unit tests:
   - `test_var_set_raises_on_runtime_error` - verifies `Variable.set()` 
re-raises `AirflowRuntimeError` when the underlying call fails (e.g. a 403 from 
the API server).
   - `test_var_delete_raises_on_runtime_error` - same for `Variable.delete()`.
   - `test_var_delete` - basic success-path coverage for `Variable.delete()`, 
which was previously untested.
   
   
   Here's a DAG for testing:
   ```python
   import datetime
   from airflow.sdk import dag, task
   
   @dag(
       dag_id="test_variable_error_propagation",
       start_date=datetime.datetime(2026, 1, 1),
       schedule=None,
       catchup=False,
   )
   def test_variable_error_propagation():
       @task
       def test_variable_set_and_delete_success():
           from airflow.sdk import Variable
   
           Variable.set("abc", "efg")
           Variable.delete("abc")
           print("Variable.delete succeeded")
   
       t1 = test_variable_set_and_delete_success()
   
       t1
   
   
   test_variable_error_propagation()
   ```
   
   Use the following test middleware plugin with breeze to test 403 API errors
   ```python
   from __future__ import annotations
   
   import json
   
   from airflow.plugins_manager import AirflowPlugin
   
   
   class RejectVariableWritesMiddleware:
       """Intercept PUT/DELETE /execution/variables/* and return 403."""
   
       def __init__(self, app) -> None:
           self.app = app
   
       async def __call__(self, scope, receive, send) -> None:
           if scope["type"] == "http":
               path: str = scope.get("path", "")
               method: str = scope.get("method", "")
               if method in ("PUT", "DELETE") and 
path.startswith("/execution/variables/"):
                   body = json.dumps({"detail": "Variable writes rejected by 
test middleware"}).encode()
                   await send(
                       {
                           "type": "http.response.start",
                           "status": 403,
                           "headers": [
                               [b"content-type", b"application/json"],
                               [b"content-length", str(len(body)).encode()],
                           ],
                       }
                   )
                   await send({"type": "http.response.body", "body": body})
                   return
           await self.app(scope, receive, send)
   
   
   class RejectVariableWritesPlugin(AirflowPlugin):
       name = "reject_variable_writes"
       fastapi_root_middlewares = [
           {
               "name": "RejectVariableWritesMiddleware",
               "middleware": RejectVariableWritesMiddleware,
           }
       ]
   ```
   
   Was generative AI tooling used to co-author this PR?
   
   [X] Yes 
   
   Used Claude to generate the middleware plugin to test 403 API errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to