borismo opened a new pull request, #41206:
URL: https://github.com/apache/airflow/pull/41206
# Problems
- When a `RedshiftDataOperator` task is configured with `deferrable=True`
and `wait_for_completion=True` (the default), it doesn't go in `deferred`
state. Instead it stays in `running` state until the statement completes.
- Also, if `wait_for_completion=False` and `deferrable=True`, after the
statement is submitted, the task will still go into `deferred` mode and wait
for the statement to complete.
# Reasons
- Currently, if `deferrable=True`, `self.wait_for_completion` is set to
`False` in `execute()`, but never used after.
- `execute` does not check whether the task should wait for completion, only
if it should be deferred.
# Solution
- Overwrite `wait_for_completion` instead of `self.wait_for_completion` when
deferrable. Also, remove redundant condition on `self.wait_for_completion`
- Before going into deferrable mode, also check that the task is supposed to
wait for completion.
# How I tested
Checked with this simple DAG that the operator now behaves as expected, for
all 4 combinations:
```python
from airflow.decorators import dag
from airflow.providers.amazon.aws.operators.redshift_data import
RedshiftDataOperator
@dag(
"Foo",
)
def _():
for task_id, config in {
"wait_defer": {"deferrable": True, "wait_for_completion": True},
"wait_no_defer": {"deferrable": False, "wait_for_completion": True},
"no_wait_defer": {"deferrable": True, "wait_for_completion": False},
"no_wait_no_defer": {"deferrable": False, "wait_for_completion":
False},
}.items():
RedshiftDataOperator(
task_id=task_id,
aws_conn_id="redshift_data",
cluster_identifier="data-warehouse",
db_user="airflow",
database="bar",
sql="""CREATE TEMPORARY TABLE tmp_foo AS
SELECT *
FROM some.big_table
LIMIT 10;""",
deferrable=config["deferrable"],
wait_for_completion=config["wait_for_completion"],
)
_()
```
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]