amoghrajesh opened a new pull request, #61157: URL: https://github.com/apache/airflow/pull/61157
Reattempting: https://github.com/apache/airflow/pull/60968 after it was reverted due to failing tests in https://github.com/apache/airflow/actions/runs/21411369808/job/61650388211 The second commit makes the changes needed to fix the issues that lead to failures. ### Why? Lineage collection is a task execution concern and on checking it only runs on workers (task sdk consumer) processes, not in any server components (scheduler, api server). I intend to move lineage module from airflow-core to task-sdk as part of the ongoing client server separation work. Some more context: - `io/path.py` intercepts file I/O during task execution - OpenLineage listeners run in the worker process after task completion - It only orchestrates, never executes user code ### What is done? - Created a new module in sdk: `sdk/lineage.py` - Moved all classes from `airflow.lineage.hook` - Updated imports to SDK equivalents: - `ProvidersManager` → `ProvidersManagerTaskRuntime` - `airflow.utils.log.logging_mixin` → `airflow.sdk.definitions._internal.logging_mixin` - Created `get_hook_lineage_readers_plugins()` using SDK's plugin discovery ### Backward Compatibility 1. For core - - Supports both `from airflow.lineage.hook import X` and `from airflow.lineage import hook` 2. Provider compatibility has been handled with `providers/common/compat/src/airflow/providers/common/compat/sdk.py` - Added lineage classes to compat layer - Handles `DatasetLineageInfo` → `AssetLineageInfo` rename (AF2 → AF3) 3. Removed from core -`airflow-core/src/airflow/plugins_manager.py` - Deleted unused `get_hook_lineage_readers_plugins()` function - Function only existed for core's old lineage implementation 4. For provider developers, it is recommended to use imports from `airflow.providers.common.compat.sdk` ### Testing To gain confidence I tried to test a manual e2e scenario for this. Ran breeze with OL integration: `breeze start-airflow --integration openlineage` DAG: ```python from __future__ import annotations from datetime import datetime from airflow.sdk import DAG from airflow.hooks.base import BaseHook from airflow.providers.standard.operators.python import PythonOperator from airflow.sdk.lineage import get_hook_lineage_collector class SimpleHook(BaseHook): def process(self): collector = get_hook_lineage_collector() collector.add_input_asset(self, uri="file:///input/data.csv") collector.add_output_asset(self, uri="file:///output/result.csv") print("Lineage reported! "*10) def my_task(): hook = SimpleHook() hook.process() with DAG( dag_id="simple_lineage", start_date=datetime(2021, 1, 1), schedule=None, catchup=False, ) as dag: PythonOperator( task_id="run_hook", python_callable=my_task, ) ``` Its a simple dag that does this: - Creates a custom hook (SimpleHook) that reports lineage information - Reports input dataset: `file:///input/data.csv` - Reports output dataset: `file:///output/result.csv` - Sends lineage to OpenLineage by using `get_hook_lineage_collector()` to register assets DAG run: <img width="2496" height="1029" alt="image" src="https://github.com/user-attachments/assets/130028b0-7779-43ad-8981-35a57b3f607a" /> Marquez: <img width="2496" height="1029" alt="image" src="https://github.com/user-attachments/assets/d3ceb23d-cdc5-4dbe-9fdd-8e94618e9fe5" /> <img width="2496" height="1029" alt="image" src="https://github.com/user-attachments/assets/f2e66c48-50d9-4026-bb64-2faf0d8412e3" /> <img width="2496" height="1029" alt="image" src="https://github.com/user-attachments/assets/f1b78bce-b9d5-4216-8809-299755a4a6bb" /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
