github-actions[bot] opened a new pull request, #51199:
URL: https://github.com/apache/airflow/pull/51199
When using `dag.test()` with deferred tasks, tasks that complete their
trigger execution were incorrectly being set to `SUCCESS` state instead of
`SCHEDULED` state. This prevented task resumption!
Dag used to test:
```python
from datetime import datetime, timedelta, timezone
from typing import Any
import pendulum
from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sdk import Context, task, BaseOperator, DAG
class DummyOperator(BaseOperator):
def execute(self, context: Context):
self.defer(
trigger=DateTimeTrigger(
moment=datetime.now(timezone.utc) + timedelta(seconds=2),
),
method_name="execute_complet",
)
def execute_complet(self, context: Context, event: Any = None):
assert event is not None
return "test"
@task
def dummy_task(param):
print("DEBUG")
assert param == "test", "Parameter should be 'test'"
with DAG(
dag_id="example_debug",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
task1 = DummyOperator(task_id="task1")
task2 = dummy_task(task1.output)
task1 >> task2
if __name__ == "__main__":
dag.test()
```
(cherry picked from commit 1b83f7135d4b5167972ec32752642924e8e0a55a)
Co-authored-by: Kaxil Naik <[email protected]>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]