aditya-7 opened a new issue, #38037: URL: https://github.com/apache/airflow/issues/38037
### Apache Airflow Provider(s) openlineage ### Versions of Apache Airflow Providers <img width="600" alt="Screenshot 2024-03-11 at 6 52 24 PM" src="https://github.com/apache/airflow/assets/13845504/5bdc0551-fa98-471d-a067-0a60b7329e25"> ### Apache Airflow version 2.8.2 ### Operating System Debian GNU/Linux 12 (bookworm) ### Deployment Docker-Compose ### Deployment details Docker Compose version v2.24.3-desktop. Created a custom docker image using `Dockerfile`: ``` FROM apache/airflow:2.8.2 # COPY manager.py /home/airflow/.local/lib/python3.8/site-packages/airflow/providers/openlineage/extractors/manager.py COPY dags /opt/airflow/dags/ COPY plugins /opt/airflow/plugins/ ``` Changed `x-airflow-common.&airflow-common` in the `docker-compose.yml` file: ``` &airflow-common # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.2} build: . environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'true' AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-amazon} AIRFLOW__OPENLINEAGE__TRANSPORT: '{"type":"http","url":"http://192.168.1.40:9090"}' AIRFLOW__OPENLINEAGE__NAMESPACE: MyNamespace AIRFLOW__OPENLINEAGE__EXTRACTORS: plugins.extractors.some_lineage_extractor.MyExtractor ``` Built & deployed using the command: `docker-compose build && docker-compose up` This is my project structure:  ### What happened While I deploy Airflow, the airflow-scheduler, and the airflow-triggerer containers fail to load the openlineage plugin. They can load inbuilt extractors such as BashExtractor, PythonExtractor, etc. Interestingly, the airflow-init container was able to load the plugin successfully. I was able to test this by overriding the library file `/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/openlineage/extractors/manager.py` with a few debug points using the logger. I overwrote the ExtractorManager constructor to add some debug points like this: ``` class ExtractorManager(LoggingMixin): """Class abstracting management of custom extractors.""" def __init__(self): super().__init__() self.extractors: dict[str, type[BaseExtractor]] = {} self.default_extractor = DefaultExtractor # Built-in Extractors like Bash and Python for extractor in _iter_extractor_types(): print(f"inbuilt extractor: {extractor}") for operator_class in extractor.get_operator_classnames(): self.extractors[operator_class] = extractor # Semicolon-separated extractors in Airflow configuration or OPENLINEAGE_EXTRACTORS variable. # Extractors should implement BaseExtractor env_extractors = conf.get("openlineage", "extractors", fallback=os.getenv("OPENLINEAGE_EXTRACTORS")) # skip either when it's empty string or None if env_extractors: self.log.info(f"All extractors: {env_extractors}") for extractor in env_extractors.split(";"): self.log.info(f"extractor:{extractor}") try: self.log.info(f"PATH = '{os.getenv('PATH')}'") self.log.info(os.listdir('/opt/airflow/plugins/extractors/')) except FileNotFoundError: self.log.error('Extractors directory does not exist.') extractor: type[BaseExtractor] = try_import_from_string(extractor.strip()) for operator_class in extractor.get_operator_classnames(): if operator_class in self.extractors: self.log.debug( "Duplicate extractor found for `%s`. `%s` will be used instead of `%s`", operator_class, extractor, self.extractors[operator_class], ) self.extractors[operator_class] = extractor ``` - The airflow-triggerer and the airflow-scheduler containers failed to laod the openlineage plugin while trying to import the custom extractor class with the following error: <img width="1754" alt="Screenshot 2024-03-11 at 7 42 22 PM" src="https://github.com/apache/airflow/assets/13845504/cfe9c0e4-7d68-4dc1-921f-32c3f12237e2"> - Whereas, the airflow-init container successfully loaded the plugin with the same custom extractor: <img width="1761" alt="Screenshot 2024-03-11 at 7 42 02 PM" src="https://github.com/apache/airflow/assets/13845504/09766fd2-6704-494a-aa8c-3989e412d0a6"> ### What you think should happen instead The Airflow triggerer and the scheduler should also be able to import the Custom extractor class like the Airflow init container did, and successfully load the openlineage plugin. ### How to reproduce 1. Create the project folder with any DAG. 2. Write a custom extractor `<project_root>/plugins/extractors/some_ilneage_extractor.py` ``` from airflow.providers.openlineage.extractors import BaseExtractor, OperatorLineage from openlineage.client.run import Dataset class MyExtractor(BaseExtractor): @classmethod def get_operator_classnames(cls): return ['PythonOperator'] def extract(self) -> OperatorLineage: return OperatorLineage(inputs=[Dataset(namespace=f"s3a://{self.operator.input_bucket}", name=source) for source in sorted(self.operator.resolved_source_objects)], outputs=[Dataset(namespace=f"s3a://{self.operator.output_bucket}", name=source) for source in sorted(self.operator.resolved_destination_objects)]) ``` 3. Create a `Dockerfile` at project root and create update the `docker-compose.yaml` file as mentioned in the Deployment Details section above. 4. Run `docker-compose build && docker-compose up` to see the errors in the log. ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
