Salfiii opened a new pull request, #67637:
URL: https://github.com/apache/airflow/pull/67637
<!-- SPDX-License-Identifier: Apache-2.0
https://www.apache.org/licenses/LICENSE-2.0 -->
<!--
Implemented Dag-level outles in Task SDK based on issue
https://github.com/apache/airflow/issues/39105, and kept the API consistent
with how BaseOperator handles outlets.
We need a possiblity to trigger other dags by a successfully dag run and I
came across the proposal in above issue that would solve our needs without
manually implementing a specific fan-in/notification task at the end of every
dag and it also fits the new Asset approach of airflow.
I tried to respect the guidlines in the comments of said issue.
**Changes**:
In task-sdk/src/airflow/sdk/definitions/dag.py:
1.Added outlets to DAG init
◦New field: outlets: list[Any]
◦Handles both a single outlet and a colection.
2.Added operator-style outlet composition on Dag
◦DAG.__gt__ now supports dag > outlet
◦DAG.add_outlets(...) appends validated outlets
3.Wired Dag success to outlet emission
◦In __attrs_post_init__, if Dag outlets are set, an outlet-emission callback
is appended to on_success_callback
◦Existing user callbacks are preserved
◦has_on_success_callback is updated
4.Added the callback implementation
◦Emits asset events when a Dag run succeeds
◦Only emits concrete Asset outlets
◦Includes Dag/run source metadata
◦Creates missing asset models before retrying event registration (same
pattern as task outlet handling)
In airflow-core/src/airflow/assets/manager.py:
5.Extended register_asset_change(...) so Dag-level emitters can provide
source fields directly. This makes outlets/events traceable to dag-runs ( I
hope thats ok=
◦Added optional args:
▪source_dag_id
▪source_task_id
▪source_run_id
▪source_map_index
◦Task-instance flow is unchanged
6. In task-sdk/tests/task_sdk/definitions/test_dag.py:
Added tests for:
◦outlets init from single value
◦outlets init from collection
◦dag > outlet
◦rejecting invalid outlet input
◦auto-registration of Dag success outlet callback
◦preserving existing success callback when outlets are enabled
**Why**
The issue asks for Dag-level outlets with the same practical effect as
operator outlets, but triggered when the Dag run succeeds.
* closes: [#ISSUE](https://github.com/apache/airflow/issues/39105)
-->
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change below checkbox to `[X]` followed by the name of the tool, uncomment
the "Generated-by".
-->
- [X] Yes (please specify the tool below)
<!--
Generated-by: Codex via Jetbrains AI Assistant following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
Gen-Ai was mainly used to explain the outlets structure of baseOperator and
to understand on how to integrate the same approach into the DAG because the
codebase is quite complex.
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]