This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 454b5bbf52 Fix RedshiftDataOperator not running in deferred mode when 
it should (#41206)
454b5bbf52 is described below

commit 454b5bbf529ea2a9b0b69a871803ff8920af0bb5
Author: Boris Morel <[email protected]>
AuthorDate: Wed Aug 7 19:35:03 2024 +0800

    Fix RedshiftDataOperator not running in deferred mode when it should 
(#41206)
    
    * Ensure operator goes into deferrable mode
    
    * Remove commented out code
    
    * Test when not waiting for completion
    
    * Add entry to changelog
    
    * Rephrase warning
---
 airflow/providers/amazon/CHANGELOG.rst             | 14 ++++++++
 .../amazon/aws/operators/redshift_data.py          |  6 ++--
 .../amazon/aws/operators/test_redshift_data.py     | 38 ++++++++++++++++++++--
 3 files changed, 53 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/amazon/CHANGELOG.rst 
b/airflow/providers/amazon/CHANGELOG.rst
index ed3b50f1ab..1dfc37909c 100644
--- a/airflow/providers/amazon/CHANGELOG.rst
+++ b/airflow/providers/amazon/CHANGELOG.rst
@@ -26,6 +26,20 @@
 Changelog
 ---------
 
+Main
+......
+
+.. warning:: When deferrable mode was introduced for ``RedshiftDataOperator``, 
in version 8.17.0, tasks configured with
+  ``deferrable=True`` and ``wait_for_completion=True`` wouldn't enter the 
deferred state. Instead, the task would occupy
+  an executor slot until the statement was completed. A workaround may have 
been to set ``wait_for_completion=False``.
+  In this version, tasks set up with ``wait_for_completion=False`` will not 
wait anymore, regardless of the value of
+  ``deferrable``.
+
+Bug Fixes
+~~~~~~~~~
+
+* ``Fix deferred mode for 'RedshiftDataOperator' (#41206)``
+
 8.27.0
 ......
 
diff --git a/airflow/providers/amazon/aws/operators/redshift_data.py 
b/airflow/providers/amazon/aws/operators/redshift_data.py
index 54e3c2c7ae..45fee2a919 100644
--- a/airflow/providers/amazon/aws/operators/redshift_data.py
+++ b/airflow/providers/amazon/aws/operators/redshift_data.py
@@ -127,8 +127,8 @@ class 
RedshiftDataOperator(AwsBaseOperator[RedshiftDataHook]):
 
         # Set wait_for_completion to False so that it waits for the status in 
the deferred task.
         wait_for_completion = self.wait_for_completion
-        if self.deferrable and self.wait_for_completion:
-            self.wait_for_completion = False
+        if self.deferrable:
+            wait_for_completion = False
 
         self.statement_id = self.hook.execute_query(
             database=self.database,
@@ -144,7 +144,7 @@ class 
RedshiftDataOperator(AwsBaseOperator[RedshiftDataHook]):
             poll_interval=self.poll_interval,
         )
 
-        if self.deferrable:
+        if self.deferrable and self.wait_for_completion:
             is_finished = self.hook.check_query_is_finished(self.statement_id)
             if not is_finished:
                 self.defer(
diff --git a/tests/providers/amazon/aws/operators/test_redshift_data.py 
b/tests/providers/amazon/aws/operators/test_redshift_data.py
index a02515441b..fa021395a4 100644
--- a/tests/providers/amazon/aws/operators/test_redshift_data.py
+++ b/tests/providers/amazon/aws/operators/test_redshift_data.py
@@ -51,7 +51,7 @@ def deferrable_operator():
         secret_arn=secret_arn,
         statement_name=statement_name,
         parameters=parameters,
-        wait_for_completion=False,
+        wait_for_completion=True,
         poll_interval=poll_interval,
         deferrable=True,
     )
@@ -276,7 +276,6 @@ class TestRedshiftDataOperator:
             poll_interval=poll_interval,
         )
 
-    # 
@mock.patch("airflow.providers.amazon.aws.operators.redshift_data.RedshiftDataOperator.defer")
     @mock.patch(
         
"airflow.providers.amazon.aws.hooks.redshift_data.RedshiftDataHook.check_query_is_finished",
         return_value=False,
@@ -315,3 +314,38 @@ class TestRedshiftDataOperator:
                 == "uuid"
             )
         mock_log_info.assert_called_with("%s completed successfully.", TASK_ID)
+
+    
@mock.patch("airflow.providers.amazon.aws.operators.redshift_data.RedshiftDataOperator.defer")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_data.RedshiftDataHook.check_query_is_finished")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_data.RedshiftDataHook.execute_query")
+    def test_no_wait_for_completion(self, mock_exec_query, 
mock_check_query_is_finished, mock_defer):
+        """Tests that the operator does not check for completion nor defers 
when wait_for_completion is False,
+        no matter the value of deferrable"""
+        cluster_identifier = "cluster_identifier"
+        db_user = "db_user"
+        secret_arn = "secret_arn"
+        statement_name = "statement_name"
+        parameters = [{"name": "id", "value": "1"}]
+        poll_interval = 5
+
+        wait_for_completion = False
+
+        for deferrable in [True, False]:
+            operator = RedshiftDataOperator(
+                aws_conn_id=CONN_ID,
+                task_id=TASK_ID,
+                sql=SQL,
+                database=DATABASE,
+                cluster_identifier=cluster_identifier,
+                db_user=db_user,
+                secret_arn=secret_arn,
+                statement_name=statement_name,
+                parameters=parameters,
+                wait_for_completion=wait_for_completion,
+                poll_interval=poll_interval,
+                deferrable=deferrable,
+            )
+            operator.execute(None)
+
+            assert not mock_check_query_is_finished.called
+            assert not mock_defer.called

Reply via email to