Git4Vishal opened a new issue, #57512:
URL: https://github.com/apache/airflow/issues/57512
### Apache Airflow version
2.11.0
### If "Other Airflow 2/3 version" selected, which one?
_No response_
### What happened?
Summary:
CloudComposerDAGRunSensor incorrectly returns True (success) when no DAG
runs exist within the specified execution_range, potentially causing downstream
tasks to execute without the expected upstream dependencies being met.
### What you think should happen instead?
The sensor should only return True when:
1. At least one DAG run exists within the execution_range, AND
2. All DAG runs within that range are in allowed_states
If no runs exist in the time window, the sensor should return False
(continue waiting).
**Actual Behavior**
The sensor returns True even when no DAG runs exist within the
execution_range, causing downstream tasks to proceed without the expected
dependency being met.
**Impact**
This bug can cause serious production issues:
1. Silent Data Pipeline Failures: Downstream tasks execute without required
upstream data
2. Data Corruption: Tasks process incomplete or missing datasets
3. No Error Alerts: The sensor succeeds silently, providing no indication of
the missing dependency
4. Difficult to Debug: The issue only manifests as data inconsistencies, not
task failures
**Real-World Scenario**
# Daily ETL pipeline on Astronomer depends on Composer DAG
```python
wait_for_composer = CloudComposerDAGRunSensor(
composer_dag_id="daily_data_ingestion",
execution_range=[yesterday_9am, yesterday_5pm], # Yesterday's business
hours
)
```
If the Composer DAG _daily_data_ingestion_ never ran yesterday:
- ❌ Buggy Behavior: Sensor returns True → Downstream ETL proceeds with no
data → Data corruption
- ✅ Expected Behavior: Sensor returns False → Keeps waiting → Eventually
times out with clear error
### How to reproduce
**Steps to Reproduce**
**1. Create Test DAG**
```python
from airflow.decorators import dag
from airflow.providers.google.cloud.sensors.cloud_composer import
CloudComposerDAGRunSensor
from datetime import datetime, timedelta, timezone
from airflow.utils.dates import days_ago
@dag(schedule_interval=None, start_date=days_ago(1), catchup=False)
def test_composer_sensor_bug():
# Set up time window where NO runs exist
yesterday = datetime.now(timezone.utc) - timedelta(days=1)
start_time = datetime(yesterday.year, yesterday.month, yesterday.day,
11, 0, 0, tzinfo=timezone.utc)
end_time = datetime(yesterday.year, yesterday.month, yesterday.day, 14,
0, 0, tzinfo=timezone.utc)
# Monitor a DAG that has runs, but NONE in the time window
sensor = CloudComposerDAGRunSensor(
task_id="test_sensor",
project_id="my-project",
region="us-east1",
environment_id="my-composer-env",
composer_dag_id="target_dag", # This DAG exists but didn't run in
the window
execution_range=[start_time, end_time],
poke_interval=10,
timeout=60,
)
test_composer_sensor_bug()
```
**2. Prerequisites**
- A Composer environment with a DAG that has historical runs
- Ensure the target DAG has NO runs within the specified time window
- Example: If checking yesterday 11:00-14:00, ensure the DAG didn't run
during that time
**3. Execute DAG**
`
airflow dags trigger test_composer_sensor_bug
`
**4. Observe Buggy Behavior**
- **Expected**: Sensor times out after 60 seconds (no runs in window)
- **Actual**: Sensor immediately succeeds (returns True)
**Root Cause Analysis**
**Buggy Code**
**Location**: _airflow/providers/google/cloud/sensors/cloud_composer.py_
`
def _check_dag_runs_states(
self,
dag_runs: list[dict],
start_date: datetime,
end_date: datetime,
) -> bool:
for dag_run in dag_runs:
if (
start_date.timestamp()
< parser.parse(
dag_run["execution_date" if self._composer_airflow_version <
3 else "logical_date"]
).timestamp()
< end_date.timestamp()
) and dag_run["state"] not in self.allowed_states:
return False
return True # ❌ BUG: Returns True even if no runs found in window!
`
**Why It Fails**
**Scenario**: No DAG runs in time window
- dag_runs contains runs, but all are outside the time window
- Loop iterates through all runs
- For each run, the time window check start_date < execution_date < end_date
evaluates to False
- The and operator short-circuits, so dag_run["state"] not in
self.allowed_states is never evaluated
- Loop never returns False
- Method reaches return True at the end ❌
**Proposed Fix**
**Fixed Code**
```python
def _check_dag_runs_states(
self,
dag_runs: list[dict],
start_date: datetime,
end_date: datetime,
) -> bool:
found_runs_in_window = False # Track if we found any runs
for dag_run in dag_runs:
execution_date = parser.parse(
dag_run["execution_date" if self._composer_airflow_version < 3
else "logical_date"]
)
# Check if run is within time window
if start_date.timestamp() < execution_date.timestamp() <
end_date.timestamp():
found_runs_in_window = True # Mark that we found at least one
run
# If any run in window is not in allowed states, return False
immediately
if dag_run["state"] not in self.allowed_states:
return False
# ✅ FIX: Only return True if we found at least one run in the window
return found_runs_in_window
```
**Key Changes**
1. Added found_runs_in_window flag: Tracks whether at least one run exists
in the time window
2. Separated conditions: Split the compound condition to set the flag
independently
3. Conditional return: Only return True if runs were found AND all are in
allowed states
4.
### Operating System
/usr/local/airflow$ cat /etc/os-release PRETTY_NAME="Debian GNU/Linux 12
(bookworm)" NAME="Debian GNU/Linux" VERSION_ID="12" VERSION="12 (bookworm)"
VERSION_CODENAME=bookworm ID=debian HOME_URL="https://www.debian.org/"
SUPPORT_URL="https://www.debian.org/support"
BUG_REPORT_URL="https://bugs.debian.org/"
### Versions of Apache Airflow Providers
apache-airflow-providers-google==15.1.0
### Deployment
Astronomer
### Deployment details
Astro Runtime
13.2.0 (Based on Airflow 2.11.0)
### Anything else?
```bash
diff --git a/airflow/providers/google/cloud/sensors/cloud_composer.py
b/airflow/providers/google/cloud/sensors/cloud_composer.py
index abc123..def456 100644
--- a/airflow/providers/google/cloud/sensors/cloud_composer.py
+++ b/airflow/providers/google/cloud/sensors/cloud_composer.py
@@ -XXX,XX +XXX,XX @@ class CloudComposerDAGRunSensor(BaseSensorOperator):
def _check_dag_runs_states(
self,
dag_runs: list[dict],
start_date: datetime,
end_date: datetime,
) -> bool:
+ found_runs_in_window = False
+
for dag_run in dag_runs:
- if (
- start_date.timestamp()
- < parser.parse(
- dag_run["execution_date" if
self._composer_airflow_version < 3 else "logical_date"]
- ).timestamp()
- < end_date.timestamp()
- ) and dag_run["state"] not in self.allowed_states:
- return False
- return True
+ execution_date = parser.parse(
+ dag_run["execution_date" if self._composer_airflow_version
< 3 else "logical_date"]
+ )
+
+ if start_date.timestamp() < execution_date.timestamp() <
end_date.timestamp():
+ found_runs_in_window = True
+
+ if dag_run["state"] not in self.allowed_states:
+ return False
+
+ return found_runs_in_window
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]