amoghrajesh opened a new pull request, #60968: URL: https://github.com/apache/airflow/pull/60968
<!-- Thank you for contributing! Please provide above a brief description of the changes made in this pull request. Write a good git commit message following this guide: http://chris.beams.io/posts/git-commit/ Please make sure that your code changes are covered with tests. And in case of new features or big changes remember to adjust the documentation. Feel free to ping (in general) for the review if you do not see reaction for a few days (72 Hours is the minimum reaction time you can expect from volunteers) - we sometimes miss notifications. In case of an existing issue, reference it using one of the following: * closes: #ISSUE * related: #ISSUE --> --- ##### Was generative AI tooling used to co-author this PR? <!-- If generative AI tooling has been used in the process of authoring this PR, please change below checkbox to `[X]` followed by the name of the tool, uncomment the "Generated-by". --> - [x] No <!-- Generated-by: [Tool Name] following [the guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions) --> --- ### Why? Lineage collection is a task execution concern and on checking it only runs on workers (task sdk consumer) processes, not in any server components (scheduler, api server). I intend to move lineage module from airflow-core to task-sdk as part of the ongoing client server separation work. Some more context: - `io/path.py` intercepts file I/O during task execution - OpenLineage listeners run in the worker process after task completion - It only orchestrates, never executes user code ### What is done? - Created a new module in sdk: `sdk/lineage.py` - Moved all classes from `airflow.lineage.hook` - Updated imports to SDK equivalents: - `ProvidersManager` → `ProvidersManagerTaskRuntime` - `airflow.utils.log.logging_mixin` → `airflow.sdk.definitions._internal.logging_mixin` - Created `get_hook_lineage_readers_plugins()` using SDK's plugin discovery ### Backward Compatibility 1. For core - - Supports both `from airflow.lineage.hook import X` and `from airflow.lineage import hook` 2. Provider compatibility has been handled with `providers/common/compat/src/airflow/providers/common/compat/sdk.py` - Added lineage classes to compat layer - Handles `DatasetLineageInfo` → `AssetLineageInfo` rename (AF2 → AF3) 3. Removed from core -`airflow-core/src/airflow/plugins_manager.py` - Deleted unused `get_hook_lineage_readers_plugins()` function - Function only existed for core's old lineage implementation 4. For provider developers, it is recommended to use imports from `airflow.providers.common.compat.sdk` ### Testing To gain confidence I tried to test a manual e2e scenario for this. Ran breeze with OL integration: `breeze start-airflow --integration openlineage` DAG: ```python from __future__ import annotations from datetime import datetime from airflow.sdk import DAG from airflow.hooks.base import BaseHook from airflow.providers.standard.operators.python import PythonOperator from airflow.sdk.lineage import get_hook_lineage_collector class SimpleHook(BaseHook): def process(self): collector = get_hook_lineage_collector() collector.add_input_asset(self, uri="file:///input/data.csv") collector.add_output_asset(self, uri="file:///output/result.csv") print("Lineage reported! "*10) def my_task(): hook = SimpleHook() hook.process() with DAG( dag_id="simple_lineage", start_date=datetime(2021, 1, 1), schedule=None, catchup=False, ) as dag: PythonOperator( task_id="run_hook", python_callable=my_task, ) ``` Its a simple dag that does this: - Creates a custom hook (SimpleHook) that reports lineage information - Reports input dataset: `file:///input/data.csv` - Reports output dataset: `file:///output/result.csv` - Sends lineage to OpenLineage by using `get_hook_lineage_collector()` to register assets DAG run: <img width="2496" height="1029" alt="image" src="https://github.com/user-attachments/assets/130028b0-7779-43ad-8981-35a57b3f607a" /> Marquez: <img width="2496" height="1029" alt="image" src="https://github.com/user-attachments/assets/d3ceb23d-cdc5-4dbe-9fdd-8e94618e9fe5" /> <img width="2496" height="1029" alt="image" src="https://github.com/user-attachments/assets/f2e66c48-50d9-4026-bb64-2faf0d8412e3" /> <img width="2496" height="1029" alt="image" src="https://github.com/user-attachments/assets/f1b78bce-b9d5-4216-8809-299755a4a6bb" /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
