ephraimbuddy commented on issue #25681:
URL: https://github.com/apache/airflow/issues/25681#issuecomment-1218581639
One issue with the provided dag is that it uses a dynamic start_date. With
dynamic start date, each time the file processor processes the file, the dag
hashes won't match. This leads to the scheduler calling
DagRun.verify_integrity.
If you change the dag to have a static start_date and apply the below fix,
the issue will be resolved:
```diff
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 8dcbfcfc60..c55be069e3 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1039,7 +1039,8 @@ class DagRun(Base, LoggingMixin):
# Check if we have some missing indexes to create ti for
missing_indexes: Dict["MappedOperator", Sequence[int]] =
defaultdict(list)
for k, v in existing_indexes.items():
- missing_indexes.update({k:
list(set(expected_indexes[k]).difference(v))})
+ if len(v): # If list is empty, don't record
+ missing_indexes.update({k:
list(set(expected_indexes[k]).difference(v))})
return task_ids, missing_indexes
def _get_task_creator(
@@ -1199,7 +1200,8 @@ class DagRun(Base, LoggingMixin):
new_indexes[task] = range(new_length)
missing_indexes: Dict[MappedOperator, Sequence[int]] =
defaultdict(list)
for k, v in existing_indexes.items():
- missing_indexes.update({k:
list(set(new_indexes[k]).difference(v))})
+ if len(v):
+ missing_indexes.update({k:
list(set(new_indexes[k]).difference(v))})
return missing_indexes
@staticmethod
```
I will make a PR to fix updating the missing_index even when the value is
null then we can work on the dynamic start_date issue for 2.4.0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]