This is an automated email from the ASF dual-hosted git repository.
shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 14995f1967f Revert "Fixed CloudComposerDAGRunSensor to return False
when no runs exist in execution_range (#61046)" (#61346)
14995f1967f is described below
commit 14995f1967f299bf4ecb16133fb1c18f49ec1500
Author: Shahar Epstein <[email protected]>
AuthorDate: Mon Feb 2 13:54:45 2026 +0200
Revert "Fixed CloudComposerDAGRunSensor to return False when no runs exist
in execution_range (#61046)" (#61346)
This reverts commit f7bb82e302a2ca7e60249fa9f6f502b510399ba1.
---
.../google/cloud/sensors/cloud_composer.py | 19 ++++++++-------
.../google/cloud/sensors/test_cloud_composer.py | 27 ----------------------
2 files changed, 9 insertions(+), 37 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
b/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
index ce936465747..8f9da770cbc 100644
---
a/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
+++
b/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
@@ -231,17 +231,16 @@ class CloudComposerDAGRunSensor(BaseSensorOperator):
start_date: datetime,
end_date: datetime,
) -> bool:
- found_runs_in_window = False
for dag_run in dag_runs:
- execution_date = parser.parse(
- dag_run["execution_date" if self._composer_airflow_version < 3
else "logical_date"]
- )
-
- if start_date.timestamp() < execution_date.timestamp() <
end_date.timestamp():
- found_runs_in_window = True
- if dag_run["state"] not in self.allowed_states:
- return False
- return found_runs_in_window
+ if (
+ start_date.timestamp()
+ < parser.parse(
+ dag_run["execution_date" if self._composer_airflow_version
< 3 else "logical_date"]
+ ).timestamp()
+ < end_date.timestamp()
+ ) and dag_run["state"] not in self.allowed_states:
+ return False
+ return True
def _get_composer_airflow_version(self) -> int:
"""Return Composer Airflow version."""
diff --git
a/providers/google/tests/unit/google/cloud/sensors/test_cloud_composer.py
b/providers/google/tests/unit/google/cloud/sensors/test_cloud_composer.py
index 1ec63911c05..7f639a59685 100644
--- a/providers/google/tests/unit/google/cloud/sensors/test_cloud_composer.py
+++ b/providers/google/tests/unit/google/cloud/sensors/test_cloud_composer.py
@@ -205,33 +205,6 @@ class TestCloudComposerDAGRunSensor:
assert not task.poke(context={"logical_date": datetime(2024, 5, 23, 0,
0, 0)})
- @pytest.mark.parametrize("use_rest_api", [True, False])
- @pytest.mark.parametrize("composer_airflow_version", [2, 3])
-
@mock.patch("airflow.providers.google.cloud.sensors.cloud_composer.ExecuteAirflowCommandResponse.to_dict")
-
@mock.patch("airflow.providers.google.cloud.sensors.cloud_composer.CloudComposerHook")
- def test_dag_run_outside_execution_range(
- self, mock_hook, to_dict_mode, composer_airflow_version, use_rest_api
- ):
- mock_hook.return_value.wait_command_execution_result.return_value =
TEST_EXEC_RESULT(
- "success", "execution_date" if composer_airflow_version < 3 else
"logical_date"
- )
- mock_hook.return_value.get_dag_runs.return_value = TEST_GET_RESULT(
- "success", "execution_date" if composer_airflow_version < 3 else
"logical_date"
- )
- task = CloudComposerDAGRunSensor(
- task_id="task-id",
- project_id=TEST_PROJECT_ID,
- region=TEST_REGION,
- environment_id=TEST_ENVIRONMENT_ID,
- composer_dag_id="test_dag_id",
- execution_range=[datetime(2024, 5, 22, 17, 0, 0), datetime(2024,
5, 22, 20, 0, 0)],
- allowed_states=["success"],
- use_rest_api=use_rest_api,
- )
- task._composer_airflow_version = composer_airflow_version
-
- assert not task.poke(context={"logical_date": datetime(2024, 5, 23, 0,
0, 0)})
-
class TestCloudComposerExternalTaskSensor:
@pytest.mark.parametrize("composer_airflow_version", [2, 3])