aditya-7 opened a new issue, #38037:
URL: https://github.com/apache/airflow/issues/38037

   ### Apache Airflow Provider(s)
   
   openlineage
   
   ### Versions of Apache Airflow Providers
   
   <img width="600" alt="Screenshot 2024-03-11 at 6 52 24 PM" 
src="https://github.com/apache/airflow/assets/13845504/5bdc0551-fa98-471d-a067-0a60b7329e25";>
   
   
   ### Apache Airflow version
   
   2.8.2
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Docker Compose version v2.24.3-desktop.
   
   Created a custom docker image using `Dockerfile`: 
   ```
   FROM apache/airflow:2.8.2
   # COPY manager.py 
/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/openlineage/extractors/manager.py
   COPY dags /opt/airflow/dags/
   COPY plugins /opt/airflow/plugins/
   ```
   
   Changed `x-airflow-common.&airflow-common` in the `docker-compose.yml` file: 
   ```
     &airflow-common
     # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.2}
     build: .
     environment:
       &airflow-common-env
       AIRFLOW__CORE__EXECUTOR: CeleryExecutor
       AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:airflow@postgres/airflow
       AIRFLOW__CELERY__RESULT_BACKEND: 
db+postgresql://airflow:airflow@postgres/airflow
       AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
       AIRFLOW__CORE__FERNET_KEY: ''
       AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
       AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
       AIRFLOW__API__AUTH_BACKENDS: 
'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
       AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
       _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- 
apache-airflow-providers-amazon}
       AIRFLOW__OPENLINEAGE__TRANSPORT: 
'{"type":"http","url":"http://192.168.1.40:9090"}'
       AIRFLOW__OPENLINEAGE__NAMESPACE: MyNamespace
       AIRFLOW__OPENLINEAGE__EXTRACTORS: 
plugins.extractors.some_lineage_extractor.MyExtractor
   ```
   
   Built & deployed using the command:
   `docker-compose build && docker-compose up`
   
   This is my project structure:
   ![Screenshot 2024-03-11 at 7 29 04 
PM](https://github.com/apache/airflow/assets/13845504/ba6001c4-4d29-4821-90d0-2266b39f7759)
   
   
   ### What happened
   
   While I deploy Airflow, the airflow-scheduler, and the airflow-triggerer 
containers fail to load the openlineage plugin. They can load inbuilt 
extractors such as BashExtractor, PythonExtractor, etc.
   Interestingly, the airflow-init container was able to load the plugin 
successfully. I was able to test this by overriding the library file 
`/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/openlineage/extractors/manager.py`
 with a few debug points using the logger.
   I overwrote the ExtractorManager constructor to add some debug points like 
this:
   ```
   class ExtractorManager(LoggingMixin):
       """Class abstracting management of custom extractors."""
   
       def __init__(self):
           super().__init__()
           self.extractors: dict[str, type[BaseExtractor]] = {}
           self.default_extractor = DefaultExtractor
   
           # Built-in Extractors like Bash and Python
           for extractor in _iter_extractor_types():
               print(f"inbuilt extractor: {extractor}")
               for operator_class in extractor.get_operator_classnames():
                   self.extractors[operator_class] = extractor
   
           # Semicolon-separated extractors in Airflow configuration or 
OPENLINEAGE_EXTRACTORS variable.
           # Extractors should implement BaseExtractor
           env_extractors = conf.get("openlineage", "extractors", 
fallback=os.getenv("OPENLINEAGE_EXTRACTORS"))
           # skip either when it's empty string or None
           if env_extractors:
               self.log.info(f"All extractors: {env_extractors}")
               for extractor in env_extractors.split(";"):
                   self.log.info(f"extractor:{extractor}")
                   try:
                       self.log.info(f"PATH = '{os.getenv('PATH')}'")
                       
self.log.info(os.listdir('/opt/airflow/plugins/extractors/'))
                   except FileNotFoundError:
                       self.log.error('Extractors directory does not exist.')
                   extractor: type[BaseExtractor] = 
try_import_from_string(extractor.strip())
   
                   for operator_class in extractor.get_operator_classnames():
                       if operator_class in self.extractors:
                           self.log.debug(
                               "Duplicate extractor found for `%s`. `%s` will 
be used instead of `%s`",
                               operator_class,
                               extractor,
                               self.extractors[operator_class],
                           )
                       self.extractors[operator_class] = extractor
   ```
   
   - The airflow-triggerer and the airflow-scheduler containers failed to laod 
the openlineage plugin while trying to import the custom extractor class with 
the following error:
   <img width="1754" alt="Screenshot 2024-03-11 at 7 42 22 PM" 
src="https://github.com/apache/airflow/assets/13845504/cfe9c0e4-7d68-4dc1-921f-32c3f12237e2";>
   
   - Whereas, the airflow-init container successfully loaded the plugin with 
the same custom extractor:
   <img width="1761" alt="Screenshot 2024-03-11 at 7 42 02 PM" 
src="https://github.com/apache/airflow/assets/13845504/09766fd2-6704-494a-aa8c-3989e412d0a6";>
   
   
   
   ### What you think should happen instead
   
   The Airflow triggerer and the scheduler should also be able to import the 
Custom extractor class like the Airflow init container did, and successfully 
load the openlineage plugin.
   
   ### How to reproduce
   
   1. Create the project folder with any DAG.
   2. Write a custom extractor 
`<project_root>/plugins/extractors/some_ilneage_extractor.py`
   ```
   from airflow.providers.openlineage.extractors import BaseExtractor, 
OperatorLineage
   from openlineage.client.run import Dataset
   
   
   class MyExtractor(BaseExtractor):
   
       @classmethod
       def get_operator_classnames(cls):
           return ['PythonOperator']
   
       def extract(self) -> OperatorLineage:
           return 
OperatorLineage(inputs=[Dataset(namespace=f"s3a://{self.operator.input_bucket}",
 name=source)
                                          for source in 
sorted(self.operator.resolved_source_objects)],
                                  
outputs=[Dataset(namespace=f"s3a://{self.operator.output_bucket}", name=source)
                                           for source in 
sorted(self.operator.resolved_destination_objects)])
   
   ``` 
   3. Create a `Dockerfile` at project root and create update the 
`docker-compose.yaml` file as mentioned in the Deployment Details section above.
   4. Run `docker-compose build && docker-compose up` to see the errors in the 
log.
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to