amoghrajesh opened a new pull request, #61157:
URL: https://github.com/apache/airflow/pull/61157

   Reattempting: https://github.com/apache/airflow/pull/60968 after it was 
reverted due to failing tests in 
https://github.com/apache/airflow/actions/runs/21411369808/job/61650388211
   
   The second commit makes the changes needed to fix the issues that lead to 
failures.
   
   ### Why?
   
   Lineage collection is a task execution concern and on checking it only runs 
on workers (task sdk consumer) processes, not in any server components 
(scheduler, api server). I intend to move lineage module from airflow-core to 
task-sdk as part of the ongoing client server separation work.
   
   Some more context:
   
   - `io/path.py` intercepts file I/O during task execution
   -  OpenLineage listeners run in the worker process after task completion
   -  It only orchestrates, never executes user code
   
   
   ### What is done?
   
   - Created a new module in sdk: `sdk/lineage.py`
   -  Moved all classes from `airflow.lineage.hook`
      - Updated imports to SDK equivalents:
        - `ProvidersManager` → `ProvidersManagerTaskRuntime`
        - `airflow.utils.log.logging_mixin` → 
`airflow.sdk.definitions._internal.logging_mixin`
      - Created `get_hook_lineage_readers_plugins()` using SDK's plugin 
discovery
   
   ### Backward Compatibility
   
   1. For core -
      - Supports both `from airflow.lineage.hook import X` and `from 
airflow.lineage import hook`
   
   2. Provider compatibility has been handled with 
`providers/common/compat/src/airflow/providers/common/compat/sdk.py`
      - Added lineage classes to compat layer
      - Handles `DatasetLineageInfo` → `AssetLineageInfo` rename (AF2 → AF3)
   
   3. Removed from core -`airflow-core/src/airflow/plugins_manager.py`
      - Deleted unused `get_hook_lineage_readers_plugins()` function
      - Function only existed for core's old lineage implementation
   
   4. For provider developers, it is recommended to use imports from 
`airflow.providers.common.compat.sdk`
   
   ### Testing
   
   To gain confidence I tried to test a manual e2e scenario for this. Ran 
breeze with OL integration:
   `breeze start-airflow --integration openlineage`
   
   DAG:
   ```python
   from __future__ import annotations
   
   from datetime import datetime
   
   from airflow.sdk import DAG
   from airflow.hooks.base import BaseHook
   from airflow.providers.standard.operators.python import PythonOperator
   
   
   from airflow.sdk.lineage import get_hook_lineage_collector
   
   
   class SimpleHook(BaseHook):
   
       def process(self):
           collector = get_hook_lineage_collector()
   
           collector.add_input_asset(self, uri="file:///input/data.csv")
   
           collector.add_output_asset(self, uri="file:///output/result.csv")
   
           print("Lineage reported! "*10)
   
   
   def my_task():
       hook = SimpleHook()
       hook.process()
   
   
   with DAG(
       dag_id="simple_lineage",
       start_date=datetime(2021, 1, 1),
       schedule=None,
       catchup=False,
   ) as dag:
       PythonOperator(
           task_id="run_hook",
           python_callable=my_task,
       )
   ```
   
   Its a simple dag that does this:
   - Creates a custom hook (SimpleHook) that reports lineage information
   - Reports input dataset: `file:///input/data.csv`
   - Reports output dataset: `file:///output/result.csv`
   - Sends lineage to OpenLineage by using `get_hook_lineage_collector()` to 
register assets
   
   DAG run:
   <img width="2496" height="1029" alt="image" 
src="https://github.com/user-attachments/assets/130028b0-7779-43ad-8981-35a57b3f607a";
 />
   
   
   Marquez:
   <img width="2496" height="1029" alt="image" 
src="https://github.com/user-attachments/assets/d3ceb23d-cdc5-4dbe-9fdd-8e94618e9fe5";
 />
   
   
   <img width="2496" height="1029" alt="image" 
src="https://github.com/user-attachments/assets/f2e66c48-50d9-4026-bb64-2faf0d8412e3";
 />
   
   <img width="2496" height="1029" alt="image" 
src="https://github.com/user-attachments/assets/f1b78bce-b9d5-4216-8809-299755a4a6bb";
 />
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to