amoghrajesh opened a new pull request, #60968:
URL: https://github.com/apache/airflow/pull/60968

   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [x] No
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   ---
   
   
   ### Why?
   
   Lineage collection is a task execution concern and on checking it only runs 
on workers (task sdk consumer) processes, not in any server components 
(scheduler, api server). I intend to move lineage module from airflow-core to 
task-sdk as part of the ongoing client server separation work.
   
   Some more context:
   
   - `io/path.py` intercepts file I/O during task execution
   -  OpenLineage listeners run in the worker process after task completion
   -  It only orchestrates, never executes user code
   
   
   ### What is done?
   
   - Created a new module in sdk: `sdk/lineage.py`
   -  Moved all classes from `airflow.lineage.hook`
      - Updated imports to SDK equivalents:
        - `ProvidersManager` → `ProvidersManagerTaskRuntime`
        - `airflow.utils.log.logging_mixin` → 
`airflow.sdk.definitions._internal.logging_mixin`
      - Created `get_hook_lineage_readers_plugins()` using SDK's plugin 
discovery
   
   ### Backward Compatibility
   
   1. For core -
      - Supports both `from airflow.lineage.hook import X` and `from 
airflow.lineage import hook`
   
   2. Provider compatibility has been handled with 
`providers/common/compat/src/airflow/providers/common/compat/sdk.py`
      - Added lineage classes to compat layer
      - Handles `DatasetLineageInfo` → `AssetLineageInfo` rename (AF2 → AF3)
   
   3. Removed from core -`airflow-core/src/airflow/plugins_manager.py`
      - Deleted unused `get_hook_lineage_readers_plugins()` function
      - Function only existed for core's old lineage implementation
   
   4. For provider developers, it is recommended to use imports from 
`airflow.providers.common.compat.sdk`
   
   ### Testing
   
   To gain confidence I tried to test a manual e2e scenario for this. Ran 
breeze with OL integration:
   `breeze start-airflow --integration openlineage`
   
   DAG:
   ```python
   from __future__ import annotations
   
   from datetime import datetime
   
   from airflow.sdk import DAG
   from airflow.hooks.base import BaseHook
   from airflow.providers.standard.operators.python import PythonOperator
   
   
   from airflow.sdk.lineage import get_hook_lineage_collector
   
   
   class SimpleHook(BaseHook):
   
       def process(self):
           collector = get_hook_lineage_collector()
   
           collector.add_input_asset(self, uri="file:///input/data.csv")
   
           collector.add_output_asset(self, uri="file:///output/result.csv")
   
           print("Lineage reported! "*10)
   
   
   def my_task():
       hook = SimpleHook()
       hook.process()
   
   
   with DAG(
       dag_id="simple_lineage",
       start_date=datetime(2021, 1, 1),
       schedule=None,
       catchup=False,
   ) as dag:
       PythonOperator(
           task_id="run_hook",
           python_callable=my_task,
       )
   ```
   
   Its a simple dag that does this:
   - Creates a custom hook (SimpleHook) that reports lineage information
   - Reports input dataset: `file:///input/data.csv`
   - Reports output dataset: `file:///output/result.csv`
   - Sends lineage to OpenLineage by using `get_hook_lineage_collector()` to 
register assets
   
   DAG run:
   <img width="2496" height="1029" alt="image" 
src="https://github.com/user-attachments/assets/130028b0-7779-43ad-8981-35a57b3f607a";
 />
   
   
   Marquez:
   <img width="2496" height="1029" alt="image" 
src="https://github.com/user-attachments/assets/d3ceb23d-cdc5-4dbe-9fdd-8e94618e9fe5";
 />
   
   
   <img width="2496" height="1029" alt="image" 
src="https://github.com/user-attachments/assets/f2e66c48-50d9-4026-bb64-2faf0d8412e3";
 />
   
   <img width="2496" height="1029" alt="image" 
src="https://github.com/user-attachments/assets/f1b78bce-b9d5-4216-8809-299755a4a6bb";
 />
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to