This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 454b5bbf52 Fix RedshiftDataOperator not running in deferred mode when
it should (#41206)
454b5bbf52 is described below
commit 454b5bbf529ea2a9b0b69a871803ff8920af0bb5
Author: Boris Morel <[email protected]>
AuthorDate: Wed Aug 7 19:35:03 2024 +0800
Fix RedshiftDataOperator not running in deferred mode when it should
(#41206)
* Ensure operator goes into deferrable mode
* Remove commented out code
* Test when not waiting for completion
* Add entry to changelog
* Rephrase warning
---
airflow/providers/amazon/CHANGELOG.rst | 14 ++++++++
.../amazon/aws/operators/redshift_data.py | 6 ++--
.../amazon/aws/operators/test_redshift_data.py | 38 ++++++++++++++++++++--
3 files changed, 53 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/amazon/CHANGELOG.rst
b/airflow/providers/amazon/CHANGELOG.rst
index ed3b50f1ab..1dfc37909c 100644
--- a/airflow/providers/amazon/CHANGELOG.rst
+++ b/airflow/providers/amazon/CHANGELOG.rst
@@ -26,6 +26,20 @@
Changelog
---------
+Main
+......
+
+.. warning:: When deferrable mode was introduced for ``RedshiftDataOperator``,
in version 8.17.0, tasks configured with
+ ``deferrable=True`` and ``wait_for_completion=True`` wouldn't enter the
deferred state. Instead, the task would occupy
+ an executor slot until the statement was completed. A workaround may have
been to set ``wait_for_completion=False``.
+ In this version, tasks set up with ``wait_for_completion=False`` will not
wait anymore, regardless of the value of
+ ``deferrable``.
+
+Bug Fixes
+~~~~~~~~~
+
+* ``Fix deferred mode for 'RedshiftDataOperator' (#41206)``
+
8.27.0
......
diff --git a/airflow/providers/amazon/aws/operators/redshift_data.py
b/airflow/providers/amazon/aws/operators/redshift_data.py
index 54e3c2c7ae..45fee2a919 100644
--- a/airflow/providers/amazon/aws/operators/redshift_data.py
+++ b/airflow/providers/amazon/aws/operators/redshift_data.py
@@ -127,8 +127,8 @@ class
RedshiftDataOperator(AwsBaseOperator[RedshiftDataHook]):
# Set wait_for_completion to False so that it waits for the status in
the deferred task.
wait_for_completion = self.wait_for_completion
- if self.deferrable and self.wait_for_completion:
- self.wait_for_completion = False
+ if self.deferrable:
+ wait_for_completion = False
self.statement_id = self.hook.execute_query(
database=self.database,
@@ -144,7 +144,7 @@ class
RedshiftDataOperator(AwsBaseOperator[RedshiftDataHook]):
poll_interval=self.poll_interval,
)
- if self.deferrable:
+ if self.deferrable and self.wait_for_completion:
is_finished = self.hook.check_query_is_finished(self.statement_id)
if not is_finished:
self.defer(
diff --git a/tests/providers/amazon/aws/operators/test_redshift_data.py
b/tests/providers/amazon/aws/operators/test_redshift_data.py
index a02515441b..fa021395a4 100644
--- a/tests/providers/amazon/aws/operators/test_redshift_data.py
+++ b/tests/providers/amazon/aws/operators/test_redshift_data.py
@@ -51,7 +51,7 @@ def deferrable_operator():
secret_arn=secret_arn,
statement_name=statement_name,
parameters=parameters,
- wait_for_completion=False,
+ wait_for_completion=True,
poll_interval=poll_interval,
deferrable=True,
)
@@ -276,7 +276,6 @@ class TestRedshiftDataOperator:
poll_interval=poll_interval,
)
- #
@mock.patch("airflow.providers.amazon.aws.operators.redshift_data.RedshiftDataOperator.defer")
@mock.patch(
"airflow.providers.amazon.aws.hooks.redshift_data.RedshiftDataHook.check_query_is_finished",
return_value=False,
@@ -315,3 +314,38 @@ class TestRedshiftDataOperator:
== "uuid"
)
mock_log_info.assert_called_with("%s completed successfully.", TASK_ID)
+
+
@mock.patch("airflow.providers.amazon.aws.operators.redshift_data.RedshiftDataOperator.defer")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_data.RedshiftDataHook.check_query_is_finished")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_data.RedshiftDataHook.execute_query")
+ def test_no_wait_for_completion(self, mock_exec_query,
mock_check_query_is_finished, mock_defer):
+ """Tests that the operator does not check for completion nor defers
when wait_for_completion is False,
+ no matter the value of deferrable"""
+ cluster_identifier = "cluster_identifier"
+ db_user = "db_user"
+ secret_arn = "secret_arn"
+ statement_name = "statement_name"
+ parameters = [{"name": "id", "value": "1"}]
+ poll_interval = 5
+
+ wait_for_completion = False
+
+ for deferrable in [True, False]:
+ operator = RedshiftDataOperator(
+ aws_conn_id=CONN_ID,
+ task_id=TASK_ID,
+ sql=SQL,
+ database=DATABASE,
+ cluster_identifier=cluster_identifier,
+ db_user=db_user,
+ secret_arn=secret_arn,
+ statement_name=statement_name,
+ parameters=parameters,
+ wait_for_completion=wait_for_completion,
+ poll_interval=poll_interval,
+ deferrable=deferrable,
+ )
+ operator.execute(None)
+
+ assert not mock_check_query_is_finished.called
+ assert not mock_defer.called