This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ae6fdea5ef8 docs(Asset-Partition): remove outdated AIP-76 comments
(#62863)
ae6fdea5ef8 is described below
commit ae6fdea5ef8b43d846165a5ef3f7177e284dac34
Author: Wei Lee <[email protected]>
AuthorDate: Thu Mar 5 09:37:46 2026 +0800
docs(Asset-Partition): remove outdated AIP-76 comments (#62863)
---
airflow-core/src/airflow/api/common/mark_tasks.py | 6 +++---
.../src/airflow/api_fastapi/execution_api/routes/dag_runs.py | 2 --
airflow-core/src/airflow/dag_processing/collection.py | 6 +++---
.../src/airflow/example_dags/example_outlet_event_extra.py | 8 +-------
airflow-core/src/airflow/serialization/definitions/dag.py | 2 --
airflow-core/tests/unit/jobs/test_scheduler_job.py | 3 ---
task-sdk/src/airflow/sdk/bases/timetable.py | 8 +++-----
task-sdk/src/airflow/sdk/definitions/timetables/trigger.py | 1 -
8 files changed, 10 insertions(+), 26 deletions(-)
diff --git a/airflow-core/src/airflow/api/common/mark_tasks.py
b/airflow-core/src/airflow/api/common/mark_tasks.py
index d1a068887d8..a656c00c7aa 100644
--- a/airflow-core/src/airflow/api/common/mark_tasks.py
+++ b/airflow-core/src/airflow/api/common/mark_tasks.py
@@ -145,7 +145,7 @@ def find_task_relatives(
@provide_session
def get_run_ids(dag: SerializedDAG, run_id: str, future: bool, past: bool,
session: SASession = NEW_SESSION):
- """Return DAG executions' run_ids."""
+ """Return Dag executions' run_ids."""
current_logical_date = session.scalar(
select(DagRun.logical_date).where(DagRun.dag_id == dag.dag_id,
DagRun.run_id == run_id)
)
@@ -168,11 +168,11 @@ def get_run_ids(dag: SerializedDAG, run_id: str, future:
bool, past: bool, sessi
.limit(1)
)
- # determine run_id range of dag runs and tasks to consider
+ # determine run_id range of Dag runs and tasks to consider
end_date = last_logical_date if future else current_logical_date
start_date = current_logical_date if not past else first_logical_date
if not dag.timetable.can_be_scheduled:
- # If the DAG never schedules, need to look at existing DagRun if the
+ # If the Dag never schedules, need to look at existing DagRun if the
# user wants future or past runs.
dag_runs = session.scalars(
select(DagRun).where(
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index 14886bd4651..69a06eca9b5 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -124,8 +124,6 @@ def trigger_dag_run(
)
try:
- # todo: AIP-76 add partition key here
- # https://github.com/apache/airflow/issues/61075
trigger_dag(
dag_id=dag_id,
run_id=run_id,
diff --git a/airflow-core/src/airflow/dag_processing/collection.py
b/airflow-core/src/airflow/dag_processing/collection.py
index 35feef8b6b6..5ea7d833151 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -173,15 +173,15 @@ class _RunInfo(NamedTuple):
:param dags: dict of dags to query
"""
- # Skip these queries entirely if no DAGs can be scheduled to save time.
+ # Skip these queries entirely if no Dags can be scheduled to save time.
if not dag.timetable.can_be_scheduled:
return cls(None, 0)
if dag.timetable.partitioned:
- log.info("getting latest run for partitioned dag",
dag_id=dag.dag_id)
+ log.info("Getting latest run for partitioned Dag",
dag_id=dag.dag_id)
latest_run =
session.scalar(_get_latest_runs_stmt_partitioned(dag_id=dag.dag_id))
else:
- log.info("getting latest run for non-partitioned dag",
dag_id=dag.dag_id)
+ log.info("Getting latest run for non-partitioned Gag",
dag_id=dag.dag_id)
latest_run =
session.scalar(_get_latest_runs_stmt(dag_id=dag.dag_id))
if latest_run:
log.info(
diff --git
a/airflow-core/src/airflow/example_dags/example_outlet_event_extra.py
b/airflow-core/src/airflow/example_dags/example_outlet_event_extra.py
index cdac990f2d1..04e88554d16 100644
--- a/airflow-core/src/airflow/example_dags/example_outlet_event_extra.py
+++ b/airflow-core/src/airflow/example_dags/example_outlet_event_extra.py
@@ -16,7 +16,7 @@
# under the License.
"""
-Example DAG to demonstrate annotating an asset event with extra information.
+Example Dag to demonstrate annotating an asset event with extra information.
Also see examples in ``example_inlet_event_extra.py``.
"""
@@ -68,12 +68,6 @@ with DAG(
def _asset_with_extra_from_classic_operator_post_execute(context, result):
context["outlet_events"][asset].extra = {"hi": "bye"}
- # TODO: AIP-76 probably we want to make it so this could be
- # AssetEvent, list[AssetEvent], [], or None. And if [] or None,
- # then no events would be emitted. The use case is, instead of
unconditionally
- # emitting an event, we could optionally emit no events, or multiple
events,
- # i.e. for different partitions.
- # https://github.com/apache/airflow/issues/58474
BashOperator(
task_id="asset_with_extra_from_classic_operator",
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py
b/airflow-core/src/airflow/serialization/definitions/dag.py
index 91db3874913..4cb32ed35d4 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -442,8 +442,6 @@ class SerializedDAG:
DagRunInfo instances yielded if their ``logical_date`` is not earlier
than ``earliest``, nor later than ``latest``. The instances are ordered
by their ``logical_date`` from earliest to latest.
-
- # TODO: AIP-76 see issue https://github.com/apache/airflow/issues/60455
"""
if earliest is None:
earliest = self._time_restriction.earliest
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 275f85fd83f..ae6388f1b8a 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -4858,9 +4858,6 @@ class TestSchedulerJob:
self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
self.job_runner._create_dag_runs(dag_models=[dag_maker.dag_model],
session=session)
kwargs = mock_create.call_args.kwargs
- # todo: AIP-76 let's add partition_date to dag run
- # and we should probably have something on DagRun that can return the
DagRunInfo for
- # that dag run. See https://github.com/apache/airflow/issues/61167
actual = DagRunInfo(
run_after=kwargs["run_after"],
data_interval=kwargs["data_interval"],
diff --git a/task-sdk/src/airflow/sdk/bases/timetable.py
b/task-sdk/src/airflow/sdk/bases/timetable.py
index 4488cc9ba6d..e732566f153 100644
--- a/task-sdk/src/airflow/sdk/bases/timetable.py
+++ b/task-sdk/src/airflow/sdk/bases/timetable.py
@@ -37,18 +37,16 @@ class BaseTimetable:
active_runs_limit: int | None = None
"""
- Maximum active runs that can be active at one time for a DAG.
+ Maximum active runs that can be active at one time for a Dag.
- This is called during DAG initialization, and the return value is used as
- the DAG's default ``max_active_runs`` if not set on the DAG explicitly.
This
+ This is called during Dag initialization, and the return value is used as
+ the DAG's default ``max_active_runs`` if not set on the Dag explicitly.
This
should generally return *None* (no limit), but some timetables may limit
parallelism, such as ``ContinuousTimetable``.
"""
asset_condition: BaseAsset | None = None
- # TODO: AIP-76 just add partition-driven field here to differentiate the
behavior
-
def validate(self) -> None:
"""
Validate the timetable is correctly specified.
diff --git a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
index d58df8ab3f7..608f4c0a57f 100644
--- a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
+++ b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
@@ -170,7 +170,6 @@ class CronPartitionTimetable(CronTriggerTimetable):
# todo: AIP-76 we can't infer partition date from this, so we need to
store it separately
key_format: str = r"%Y-%m-%dT%H:%M:%S",
) -> None:
- # super().__init__(cron, timezone=timezone,
run_immediately=run_immediately)
if not isinstance(run_offset, (int, NoneType)):
# todo: AIP-76 implement timedelta / relative delta?
raise ValueError("Run offset other than integer not supported
yet.")