This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 930f165db1 Relax mandatory requirement for start_date when
schedule=None (#35356)
930f165db1 is described below
commit 930f165db11e611887275dce17f10eab102f0910
Author: Vishnu <[email protected]>
AuthorDate: Tue Nov 28 07:14:07 2023 +0100
Relax mandatory requirement for start_date when schedule=None (#35356)
* Relax mandatory requirement for start_date when schedule=None
* Updated run_type in unit tests
* Added check for empty start_date and non empty schedule
* Fix the build failures
* Fix the build failures
* Update based on review comments
---
airflow/models/dag.py | 11 +++--
airflow/models/dagrun.py | 2 +-
tests/models/test_dag.py | 52 +++++++++++++++---------
tests/providers/google/cloud/sensors/test_gcs.py | 4 +-
4 files changed, 45 insertions(+), 24 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 5daa7bb805..e93d5a55a0 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -548,6 +548,13 @@ class DAG(LoggingMixin):
# sort out DAG's scheduling behavior
scheduling_args = [schedule_interval, timetable, schedule]
+
+ has_scheduling_args = any(a is not NOTSET and bool(a) for a in
scheduling_args)
+ has_empty_start_date = not ("start_date" in self.default_args or
self.start_date)
+
+ if has_scheduling_args and has_empty_start_date:
+ raise ValueError("DAG is missing the start_date parameter")
+
if not at_most_one(*scheduling_args):
raise ValueError("At most one allowed for args
'schedule_interval', 'timetable', and 'schedule'.")
if schedule_interval is not NOTSET:
@@ -2618,10 +2625,8 @@ class DAG(LoggingMixin):
from airflow.utils.task_group import TaskGroupContext
- if not self.start_date and not task.start_date:
- raise AirflowException("DAG is missing the start_date parameter")
# if the task has no start date, assign it the same as the DAG
- elif not task.start_date:
+ if not task.start_date:
task.start_date = self.start_date
# otherwise, the task will start on the later of its own start date and
# the DAG's start date
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 898d266390..b2e70b37a5 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1116,7 +1116,7 @@ class DagRun(Base, LoggingMixin):
def task_filter(task: Operator) -> bool:
return task.task_id not in task_ids and (
self.is_backfill
- or task.start_date <= self.execution_date
+ or (task.start_date is None or task.start_date <=
self.execution_date)
and (task.end_date is None or self.execution_date <=
task.end_date)
)
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index f4b872109f..f7bf1ad6d0 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -765,7 +765,7 @@ class TestDag:
"""
dag_id = "test_schedule_dag_relativedelta"
delta = relativedelta(hours=+1)
- dag = DAG(dag_id=dag_id, schedule=delta)
+ dag = DAG(dag_id=dag_id, schedule=delta, start_date=TEST_DATE)
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake",
start_date=TEST_DATE))
_next = dag.following_schedule(TEST_DATE)
@@ -780,7 +780,7 @@ class TestDag:
"""
dag_id = "test_schedule_dag_relativedelta"
delta = relativedelta(hours=+1)
- dag = DAG(dag_id=dag_id, schedule_interval=delta)
+ dag = DAG(dag_id=dag_id, schedule_interval=delta, start_date=TEST_DATE)
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake",
start_date=TEST_DATE))
_next = dag.following_schedule(TEST_DATE)
@@ -799,7 +799,7 @@ class TestDag:
dag_id = "test_schedule_dag_relativedelta"
delta = relativedelta(hours=+1)
- @dag(dag_id=dag_id, schedule_interval=delta)
+ @dag(dag_id=dag_id, schedule_interval=delta, start_date=TEST_DATE)
def mydag():
BaseOperator(task_id="faketastic", owner="Also fake",
start_date=TEST_DATE)
@@ -827,6 +827,20 @@ class TestDag:
when = dag.following_schedule(start)
assert when.isoformat() == "2018-03-25T03:00:00+00:00"
+ def test_create_dagrun_when_schedule_is_none_and_empty_start_date(self):
+ # Check that we don't get an AttributeError 'start_date' for
self.start_date when schedule is none
+ dag = DAG("dag_with_none_schedule_and_empty_start_date")
+ dag.add_task(BaseOperator(task_id="task_without_start_date"))
+ dagrun = dag.create_dagrun(
+ state=State.RUNNING, run_type=DagRunType.MANUAL,
execution_date=DEFAULT_DATE
+ )
+ assert dagrun is not None
+
+ def test_fail_dag_when_schedule_is_non_none_and_empty_start_date(self):
+ # Check that we get a ValueError 'start_date' for self.start_date when
schedule is non-none
+ with pytest.raises(ValueError, match="DAG is missing the start_date
parameter"):
+ DAG(dag_id="dag_with_non_none_schedule_and_empty_start_date",
schedule="@hourly")
+
def test_following_schedule_datetime_timezone_utc0530(self):
# Check that we don't get an AttributeError 'name' for self.timezone
class UTC0530(datetime.tzinfo):
@@ -942,8 +956,8 @@ class TestDag:
mock_active_runs_of_dags =
mock.MagicMock(side_effect=DagRun.active_runs_of_dags)
with mock.patch.object(DagRun, "active_runs_of_dags",
mock_active_runs_of_dags):
dags_null_timetable = [
- DAG("dag-interval-None", schedule_interval=None),
- DAG("dag-interval-test", schedule_interval=interval),
+ DAG("dag-interval-None", schedule_interval=None,
start_date=TEST_DATE),
+ DAG("dag-interval-test", schedule_interval=interval,
start_date=TEST_DATE),
]
DAG.bulk_write_to_db(dags_null_timetable,
session=settings.Session())
if interval:
@@ -1530,7 +1544,7 @@ class TestDag:
it is called, and not scheduled the second.
"""
dag_id = "test_schedule_dag_once"
- dag = DAG(dag_id=dag_id, schedule="@once")
+ dag = DAG(dag_id=dag_id, schedule="@once", start_date=TEST_DATE)
assert isinstance(dag.timetable, OnceTimetable)
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake",
start_date=TEST_DATE))
@@ -1553,7 +1567,7 @@ class TestDag:
Tests if fractional seconds are stored in the database
"""
dag_id = "test_fractional_seconds"
- dag = DAG(dag_id=dag_id, schedule="@once")
+ dag = DAG(dag_id=dag_id, schedule="@once", start_date=TEST_DATE)
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake",
start_date=TEST_DATE))
start_date = timezone.utcnow()
@@ -1658,25 +1672,25 @@ class TestDag:
def test_timetable_and_description_from_schedule_interval_arg(
self, schedule_interval_arg, expected_timetable, interval_description
):
- dag = DAG("test_schedule_interval_arg", schedule=schedule_interval_arg)
+ dag = DAG("test_schedule_interval_arg",
schedule=schedule_interval_arg, start_date=TEST_DATE)
assert dag.timetable == expected_timetable
assert dag.schedule_interval == schedule_interval_arg
assert dag.timetable.description == interval_description
def test_timetable_and_description_from_dataset(self):
- dag = DAG("test_schedule_interval_arg",
schedule=[Dataset(uri="hello")])
+ dag = DAG("test_schedule_interval_arg",
schedule=[Dataset(uri="hello")], start_date=TEST_DATE)
assert dag.timetable == DatasetTriggeredTimetable()
assert dag.schedule_interval == "Dataset"
assert dag.timetable.description == "Triggered by datasets"
def test_schedule_interval_still_works(self):
- dag = DAG("test_schedule_interval_arg", schedule_interval="*/5 * * *
*")
+ dag = DAG("test_schedule_interval_arg", schedule_interval="*/5 * * *
*", start_date=TEST_DATE)
assert dag.timetable == cron_timetable("*/5 * * * *")
assert dag.schedule_interval == "*/5 * * * *"
assert dag.timetable.description == "Every 5 minutes"
def test_timetable_still_works(self):
- dag = DAG("test_schedule_interval_arg", timetable=cron_timetable("*/6
* * * *"))
+ dag = DAG("test_schedule_interval_arg", timetable=cron_timetable("*/6
* * * *"), start_date=TEST_DATE)
assert dag.timetable == cron_timetable("*/6 * * * *")
assert dag.schedule_interval == "*/6 * * * *"
assert dag.timetable.description == "Every 6 minutes"
@@ -1702,7 +1716,7 @@ class TestDag:
],
)
def test_description_from_timetable(self, timetable, expected_description):
- dag = DAG("test_schedule_interval_description", timetable=timetable)
+ dag = DAG("test_schedule_interval_description", timetable=timetable,
start_date=TEST_DATE)
assert dag.timetable == timetable
assert dag.timetable.description == expected_description
@@ -2449,7 +2463,7 @@ my_postgres_conn:
start_date = TEST_DATE
delta = timedelta(days=1)
- dag = DAG("dummy-dag", schedule=delta)
+ dag = DAG("dummy-dag", schedule=delta, start_date=start_date)
dag_dates = dag.date_range(start_date=start_date, num=3)
assert dag_dates == [
@@ -2502,10 +2516,10 @@ my_postgres_conn:
)
def test_schedule_dag_param(self, kwargs):
with pytest.raises(ValueError, match="At most one"):
- with DAG(dag_id="hello", **kwargs):
+ with DAG(dag_id="hello", start_date=TEST_DATE, **kwargs):
pass
- def test_continuous_schedule_interval_limits_max_active_runs(self):
+ def test_continuous_schedule_interval_linmits_max_active_runs(self):
dag = DAG("continuous", start_date=DEFAULT_DATE,
schedule_interval="@continuous", max_active_runs=1)
assert isinstance(dag.timetable, ContinuousTimetable)
assert dag.max_active_runs == 1
@@ -3010,19 +3024,19 @@ class TestDagDecorator:
@pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()])
def test_dag_timetable_match_schedule_interval(timetable):
- dag = DAG("my-dag", timetable=timetable)
+ dag = DAG("my-dag", timetable=timetable, start_date=DEFAULT_DATE)
assert dag._check_schedule_interval_matches_timetable()
@pytest.mark.parametrize("schedule_interval", [None, "@once", "@daily",
timedelta(days=1)])
def test_dag_schedule_interval_match_timetable(schedule_interval):
- dag = DAG("my-dag", schedule=schedule_interval)
+ dag = DAG("my-dag", schedule=schedule_interval, start_date=DEFAULT_DATE)
assert dag._check_schedule_interval_matches_timetable()
@pytest.mark.parametrize("schedule_interval", [None, "@daily",
timedelta(days=1)])
def test_dag_schedule_interval_change_after_init(schedule_interval):
- dag = DAG("my-dag", timetable=OnceTimetable())
+ dag = DAG("my-dag", timetable=OnceTimetable(), start_date=DEFAULT_DATE)
dag.schedule_interval = schedule_interval
assert not dag._check_schedule_interval_matches_timetable()
@@ -3391,7 +3405,7 @@ def test_get_next_data_interval(
data_interval_end,
expected_data_interval,
):
- dag = DAG(dag_id="test_get_next_data_interval", schedule="@daily")
+ dag = DAG(dag_id="test_get_next_data_interval", schedule="@daily",
start_date=DEFAULT_DATE)
dag_model = DagModel(
dag_id="test_get_next_data_interval",
next_dagrun=logical_date,
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py
b/tests/providers/google/cloud/sensors/test_gcs.py
index 422cd8f71a..1d4bbcec87 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -227,7 +227,9 @@ class TestGoogleCloudStorageObjectAsyncSensor:
class TestTsFunction:
def test_should_support_datetime(self):
context = {
- "dag": DAG(dag_id=TEST_DAG_ID, schedule=timedelta(days=5)),
+ "dag": DAG(
+ dag_id=TEST_DAG_ID, schedule=timedelta(days=5),
start_date=datetime(2019, 2, 14, 0, 0)
+ ),
"execution_date": datetime(2019, 2, 14, 0, 0),
}
result = ts_function(context)