vvijay-ad opened a new issue, #63251:
URL: https://github.com/apache/airflow/issues/63251

   ### Apache Airflow Provider(s)
   
   openlineage
   
   ### Versions of Apache Airflow Providers
   
   
   apache-airflow-providers-openlineage | 2.8.0 | OpenLineage
   
   
[apache-airflow-providers-openlineage](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/2.8.0)
   2.8.0   [OpenLineage](https://openlineage.io/)
   
   ### Apache Airflow version
   
   2.10.5
   
   ### Operating System
   
   ProductName:         macOS ProductVersion:           15.7.3 BuildVersion:    
        24G419
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   Image version: composer-3-airflow-2.10.5-build.25
   
   
   ### What happened
   
   In Cloud Composer 3 with the OpenLineage Airflow provider, a custom 
`TokenProvider` implemented in an installed PyPI package is importable by DAG 
code but consistently fails to import when referenced via 
`openlineage.transport.auth.type` in the scheduler’s OpenLineage plugin. This 
prevents emitting DAG-level OpenLineage events from the Composer 3 scheduler.
   
   ***
   
   ### Environment
   
   - **Platform:** Google Cloud Composer 3 (GKE Autopilot, Google‑managed)
   - **Airflow:** Composer 3 default Airflow 2.10.5
   - **Lineage integration:** `apache-airflow-providers-openlineage` (>=2.8.0) 
+ OpenLineage Python client
   - **Custom package (PyPI):** `ad-openlineage-token-provider==1.0.x`
   - **Composer 3 config:**
       - Package installed via Composer PyPI dependencies (build logs show 
successful install in Worker \& Scheduler image)
       - OpenLineage configured through Airflow configuration override 
(`openlineage.transport`)
   
   ***
   ### Custom TokenProvider package
   
   Published to PyPI as `ad-openlineage-token-provider`, with wheel contents:
   
   ```text
   tokenproviders/__init__.py
   tokenproviders/access_key_secret_token_provider.py
   ad_openlineage_token_provider-1.0.x.dist-info/...
   ```
   
   Implementation:
   
   ```python
   # tokenproviders/access_key_secret_token_provider.py
   from openlineage.client.transport.http import TokenProvider
   import logging
   
   log = logging.getLogger(__name__)
   
   class AccessKeySecretKeyTokenProvider(TokenProvider):
       def __init__(self, config: dict[str, str]) -> None:
           super().__init__(config)
           log.info("AccessKeySecretKeyTokenProvider constructor invoked.")
   
      def get_bearer(self):
           # get the bearer based on the access_key and secret_key config
           ...
           ...
   ```
   
   The Python import path is:
   
   ```python
   from tokenproviders.access_key_secret_token_provider import 
AccessKeySecretKeyTokenProvider
   ```
   
   
   ***
   
   ### OpenLineage transport configuration in Airflow
   
   Shown in Airflow UI → Admin → Configurations:
   
   ```json
   {
     "type": "http",
     "url": "https://myopenlineage-backend.mydomain.com";,
     "endpoint": "/api/v1/lineage",
     "auth": {
       "type": 
"tokenproviders.access_key_secret_token_provider.AccessKeySecretKeyTokenProvider",
       "access_key": "xxxxxx",
       "secret_key": "xxxxxx"
     }
   }
   ```
   
   This matches the working Python path in the package.
   
   ***
   
   ### Observed behavior
   
   1. **Package installation confirmed for scheduler image**
   
   Composer 3 Worker \& Scheduler image build logs show:
   
   ```text
   Downloading ad_openlineage_token_provider-1.0.x-py3-none-any.whl ...
   Installing collected packages: ad-openlineage-token-provider
   Successfully installed ad-openlineage-token-provider-1.0.x
   ```
   
   2. **Scheduler can import the package in DAG code**
   
   A test DAG:
   
   ```python
   # dags/test_tokenproviders_import.py
   from datetime import datetime
   from airflow import DAG
   from tokenproviders.access_key_secret_token_provider import 
AccessKeySecretKeyTokenProvider
   
   with DAG(
       dag_id="test_tokenproviders_import",
       start_date=datetime(2024, 1, 1),
       schedule=None,
       catchup=False,
   ) as dag:
       pass
   ```
   
       - Appears as **healthy** in the Airflow UI.
       - Runs successfully (no import errors in scheduler logs for this DAG 
itself).
   3. **Scheduler OpenLineage plugin fails to import the same class**
   
   When DAG-level OpenLineage events are emitted, the scheduler logs:
   
   ```text
   WARNING No module named 'tokenproviders'
   WARNING Failed to emit OpenLineage DAG started/success event:
   ...
   ModuleNotFoundError: No module named 'tokenproviders'
   ...
   ImportError: Failed to import 
tokenproviders.access_key_secret_token_provider.AccessKeySecretKeyTokenProvider
   ```
   
   Stack trace shows failure in:
       - `openlineage.client.utils.import_from_string`
       - called from `openlineage.client.transport.http.create_token_provider`
       - triggered by `airflow.providers.openlineage.plugins.adapter` during 
DAG started/success callbacks.
   
   ***
   
   ### Hypothesis
   
   - In Composer 3, the scheduler’s **DAG parsing** environment and the 
**OpenLineage plugin** environment are not sharing the same `sys.path` / 
site-packages view.
   - The installed package `ad-openlineage-token-provider` (exposing 
`tokenproviders`) is visible to DAG imports but **not** visible to the Python 
interpreter/context used by `airflow.providers.openlineage.plugins.adapter` 
when calling `import_from_string(auth.type)`.
   - Because Composer 3 hides scheduler shell access and does not synchronize 
custom plugins to scheduler/triggerer, there is no supported user-level way to 
adjust or inspect the plugin’s `PYTHONPATH`.
   
   ***
   
   ### Expected vs actual
   
   - **Expected:** Any Python package installed into the Worker \& Scheduler 
image via Composer’s PyPI dependencies is importable from:
       - DAG code during scheduler parsing and execution, and
       - Scheduler-resident plugins (like the OpenLineage provider) when they 
import classes by fully-qualified module path.
   - **Actual:**
       - The package is importable in DAG code (`from tokenproviders...` works).
       - The exact same module path used in `openlineage.transport.auth.type` 
fails with `ModuleNotFoundError` when OpenLineage’s scheduler plugin tries to 
import it.
   
   ***
   
   ### Impact
   
   - Custom OpenLineage token providers implemented as installed Python 
packages cannot be used in Cloud Composer 3’s scheduler.
   - DAG-level OpenLineage events (started/success/failed) cannot authenticate 
to the lineage backend using a custom TokenProvider, even though the same code 
works from workers and in local tests.
   
   ***
   
   ### Request
   
   1. **Confirm** whether this is a known limitation or bug in:
       - Cloud Composer 3’s scheduler/plugin environment, or
       - The OpenLineage Airflow provider’s integration on Composer 3.
   2. **Clarify expected behavior**:
       - Should a package installed via Composer 3 PyPI dependencies be 
importable by the OpenLineage scheduler plugin using a dotted path in 
`auth.type`?
   3. **Provide a fix or guidance** so that:
       - Installed Python packages (like `ad-openlineage-token-provider`) are 
visible in the Composer 3 scheduler OpenLineage plugin context, and
       - A custom `TokenProvider` can be referenced via 
`openlineage.transport.auth.type` as documented (e.g. 
`"mymodule.MyTokenProvider"` style paths).
   
   
   
   ### What you think should happen instead
   
       - The package is importable in DAG code (`from tokenproviders...` works).
       - The exact same module path used in `openlineage.transport.auth.type` 
fails with `ModuleNotFoundError` when OpenLineage’s scheduler plugin tries to 
import it.
       - The celery workers are able to import the tokenprovider.* module and 
emitting Task-level OpenLineage events successfully.
   
   ### How to reproduce
   
   1. **Create a minimal custom TokenProvider package**
   
   Project structure:
   
   ```text
   my_openlineage_token_provider/
     mymodule/
       __init__.py
       my_token_provider.py
     pyproject.toml or setup.py
   ```
   
   `mymodule/my_token_provider.py`:
   
   ```python
   from openlineage.client.transport.http import TokenProvider
   import logging
   
   log = logging.getLogger(__name__)
   
   class MyTokenProvider(TokenProvider):
       def __init__(self, config: dict[str, str]) -> None:
           super().__init__(config)
           log.info("MyTokenProvider constructed with config keys: %s",
                    list(config.keys()))
   ```
   
   Build and publish this to PyPI as `my-openlineage-token-provider`, so that 
installing it exposes a top‑level package `mymodule` with 
`mymodule.my_token_provider.MyTokenProvider`.
   2. **Install the package in Cloud Composer 3**
   
   In Composer 3:
       - Add `my-openlineage-token-provider==1.0.0` to the environment’s PyPI 
dependencies (UI or `gcloud composer environments update 
--update-pypi-package="my-openlineage-token-provider==1.0.0"`).
       - Wait for the Worker \& Scheduler image build to complete; verify build 
logs show:
   
   ```text
   Downloading my_openlineage_token_provider-1.0.0-py3-none-any.whl ...
   Installing collected packages: my-openlineage-token-provider
   Successfully installed my-openlineage-token-provider-1.0.0
   ```
   
   3. **Configure OpenLineage to use the custom TokenProvider**
   
   In Airflow configuration overrides for the Composer 3 environment, set:
       - Section: `openlineage`
       - Key: `transport`
       - Value (JSON):
   
   ```json
   {
     "type": "http",
     "url": "https://myopenlineage-backend.mydomain.com";,
     "endpoint": "/api/v1/lineage",
     "auth": {
       "type": "mymodule.my_token_provider.MyTokenProvider",
       "access_key": "xxxxxx",
       "secret_key": "xxxxxxxx"
     }
   }
   ```
   
   
   Confirm in Airflow UI → Admin → Configurations that this value is visible 
under `openlineage.transport`.
   4. **Create a test DAG that imports the provider**
   
   Deploy a simple DAG to the Composer 3 environment:
   
   ```python
   # dags/test_my_token_provider_import.py
   from datetime import datetime
   from airflow import DAG
   
   from mymodule.my_token_provider import MyTokenProvider
   
   with DAG(
       dag_id="test_my_token_provider_import",
       start_date=datetime(2024, 1, 1),
       schedule=None,
       catchup=False,
   ) as dag:
       pass
   ```
   
       - Verify that the DAG appears as healthy in the Airflow UI.
       - Trigger it manually and confirm there are **no import errors** for 
this DAG in scheduler logs.
   5. **Trigger any DAG and observe scheduler logs for OpenLineage**
       - Trigger `test_my_token_provider_import` or another simple DAG.
       - Check scheduler logs (Composer → Logs → airflow-scheduler) for 
OpenLineage events.
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to