borismo opened a new pull request, #41206:
URL: https://github.com/apache/airflow/pull/41206

   # Problems
   - When a `RedshiftDataOperator` task is configured with `deferrable=True` 
and `wait_for_completion=True` (the default), it doesn't go in `deferred` 
state. Instead it stays in `running` state until the statement completes.
   - Also, if `wait_for_completion=False` and `deferrable=True`, after the 
statement is submitted, the task will still go into `deferred` mode and wait 
for the statement to complete.
   
   # Reasons
   - Currently, if `deferrable=True`, `self.wait_for_completion` is set to 
`False` in `execute()`, but never used after.
   - `execute` does not check whether the task should wait for completion, only 
if it should be deferred.
   
   # Solution
   - Overwrite `wait_for_completion` instead of `self.wait_for_completion` when 
deferrable. Also, remove redundant condition on `self.wait_for_completion`
   - Before going into deferrable mode, also check that the task is supposed to 
wait for completion.
   
   # How I tested
   Checked with this simple DAG that the operator now behaves as expected, for 
all 4 combinations:
   ```python
   from airflow.decorators import dag
   from airflow.providers.amazon.aws.operators.redshift_data import 
RedshiftDataOperator
   
   @dag(
       "Foo",
   )
   def _():
       for task_id, config in {
           "wait_defer": {"deferrable": True, "wait_for_completion": True},
           "wait_no_defer": {"deferrable": False, "wait_for_completion": True},
           "no_wait_defer": {"deferrable": True, "wait_for_completion": False},
           "no_wait_no_defer": {"deferrable": False, "wait_for_completion": 
False},
       }.items():
           RedshiftDataOperator(
               task_id=task_id,
               aws_conn_id="redshift_data",
               cluster_identifier="data-warehouse",
               db_user="airflow",
               database="bar",
               sql="""CREATE TEMPORARY TABLE tmp_foo AS
               SELECT *
               FROM some.big_table
               LIMIT 10;""",
               deferrable=config["deferrable"],
               wait_for_completion=config["wait_for_completion"],
           )
   
   _()
   ```
   
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to