ephraimbuddy commented on code in PR #45370:
URL: https://github.com/apache/airflow/pull/45370#discussion_r1914416797
##########
docs/apache-airflow/best-practices.rst:
##########
@@ -725,13 +725,14 @@ This is an example test want to verify the structure of a
code-generated DAG aga
from airflow import DAG
from airflow.utils.state import DagRunState, TaskInstanceState
- from airflow.utils.types import DagRunType
+ from airflow.utils.types import DagRunTriggeredByType, DagRunType
DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
TEST_DAG_ID = "my_custom_operator_dag"
TEST_TASK_ID = "my_custom_operator_task"
+ TEST_RUN_ID = "my_custom_oeprator_dag_run"
Review Comment:
```suggestion
TEST_RUN_ID = "my_custom_operator_dag_run"
```
##########
airflow/models/dag.py:
##########
@@ -1727,87 +1727,54 @@ def add_logger_if_needed(ti: TaskInstance):
@provide_session
def create_dagrun(
self,
- state: DagRunState,
*,
- triggered_by: DagRunTriggeredByType | None,
- logical_date: datetime | None = None,
- run_id: str | None = None,
- start_date: datetime | None = None,
- external_trigger: bool | None = False,
+ run_id: str,
+ logical_date: datetime,
+ data_interval: tuple[datetime, datetime],
conf: dict | None = None,
- run_type: DagRunType | None = None,
- session: Session = NEW_SESSION,
+ run_type: DagRunType,
+ triggered_by: DagRunTriggeredByType,
+ external_trigger: bool = False,
dag_version: DagVersion | None = None,
+ state: DagRunState,
+ start_date: datetime | None = None,
creating_job_id: int | None = None,
- data_interval: tuple[datetime, datetime] | None = None,
backfill_id: int | None = None,
- ):
+ session: Session = NEW_SESSION,
+ ) -> DagRun:
"""
- Create a dag run from this dag including the tasks associated with
this dag.
-
- Returns the dag run.
+ Create a run for this DAG to run its tasks.
- :param state: the state of the dag run
- :param triggered_by: The entity which triggers the DagRun
- :param run_id: defines the run id for this dag run
- :param run_type: type of DagRun
- :param logical_date: the logical date of this dag run
:param start_date: the date this dag run should be evaluated
- :param external_trigger: whether this dag run is externally triggered
:param conf: Dict containing configuration/parameters to pass to the
DAG
- :param creating_job_id: id of the job creating this DagRun
- :param session: database session
- :param dag_version: The DagVersion object for this run
- :param data_interval: Data interval of the DagRun
- :param backfill_id: id of the backfill run if one exists
+ :param creating_job_id: ID of the job creating this DagRun
+ :param backfill_id: ID of the backfill run if one exists
+ :return: The created DAG run.
+
+ :meta private:
"""
logical_date = timezone.coerce_datetime(logical_date)
if data_interval and not isinstance(data_interval, DataInterval):
data_interval = DataInterval(*map(timezone.coerce_datetime,
data_interval))
- if data_interval is None and logical_date is not None:
- raise ValueError(
- "Calling `DAG.create_dagrun()` without an explicit data
interval is not supported."
- )
-
- if run_type is None or isinstance(run_type, DagRunType):
+ if isinstance(run_type, DagRunType):
pass
- elif isinstance(run_type, str): # Compatibility: run_type used to be
a str.
+ elif isinstance(run_type, str): # Ensure the input value is valid.
run_type = DagRunType(run_type)
else:
- raise ValueError(f"`run_type` should be a DagRunType, not
{type(run_type)}")
-
- if run_id: # Infer run_type from run_id if needed.
- if not isinstance(run_id, str):
- raise ValueError(f"`run_id` should be a str, not
{type(run_id)}")
- inferred_run_type = DagRunType.from_run_id(run_id)
- if run_type is None:
- # No explicit type given, use the inferred type.
- run_type = inferred_run_type
- elif run_type == DagRunType.MANUAL and inferred_run_type !=
DagRunType.MANUAL:
- # Prevent a manual run from using an ID that looks like a
scheduled run.
+ raise ValueError(f"run_type should be a DagRunType, not
{type(run_type)}")
+
+ if not isinstance(run_id, str):
+ raise ValueError(f"`run_id` should be a str, not {type(run_id)}")
+
+ # Prevent a manual run from using an ID that looks like a scheduled
run.
+ if run_type == DagRunType.MANUAL:
+ if (inferred_run_type := DagRunType.from_run_id(run_id)) !=
DagRunType.MANUAL:
raise ValueError(
f"A {run_type.value} DAG run cannot use ID {run_id!r}
since it "
f"is reserved for {inferred_run_type.value} runs"
)
- elif run_type and logical_date is not None: # Generate run_id from
run_type and logical_date.
- run_id = self.timetable.generate_run_id(
- run_type=run_type, logical_date=logical_date,
data_interval=data_interval
- )
- else:
- raise AirflowException(
- "Creating DagRun needs either `run_id` or both `run_type` and
`logical_date`"
- )
-
- regex = airflow_conf.get("scheduler", "allowed_run_id_pattern")
-
- if run_id and not re2.match(RUN_ID_REGEX, run_id):
- if not regex.strip() or not re2.match(regex.strip(), run_id):
Review Comment:
This check seems needed as sqlalchemy column validate doesn't seem to work
well
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]