[ https://issues.apache.org/jira/browse/AIRFLOW-2214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397612#comment-16397612 ]
Paymahn Moghadasian commented on AIRFLOW-2214: ---------------------------------------------- I've also tried naming my plugin "test" ie: {code:python} class MordorPlugin(AirflowPlugin): name = "test" operators = [MordorOperator] {code} and using new Airflow 2.0 style imports: {code:python} from airflow import DAG from airflow.operators.test import MordorOperator from datetime import datetime dag = DAG('mordor_dag', description='DAG with a single task', start_date=datetime.today(), catchup=False) mordor = MordorOperator(job="testing", task_id='run_single_task', dag=dag) {code} But that still fails: {code:bash} (venv) 2:35PM /Users/paymahn/solvvy/scheduler ✘ 1 mordor.operator ✱ ❯❯❯ AIRFLOW_USE_NEW_IMPORTS=1 python3 -m unittest section/key [core/airflow-home] not found in config section/key [core/airflow-home] not found in config section/key [core/airflow-home] not found in config section/key [core/airflow-home] not found in config Traceback (most recent call last): File "/Users/paymahn/solvvy/scheduler/dags/mordor_test.py", line 2, in <module> from airflow.operators.test import MordorOperator ModuleNotFoundError: No module named 'airflow.operators.test' ........ ====================================================================== FAIL: test_dags_parse (tests.test_dags.TestDagsAreSane) (dag='mordor_test.py') ---------------------------------------------------------------------- Traceback (most recent call last): File "/Users/paymahn/solvvy/scheduler/tests/test_dags.py", line 18, in test_dags_parse self.assertEqual(0, subprocess.run(['python3', os.path.join(os.getcwd(), 'dags', dag)]).returncode) AssertionError: 0 != 1 ---------------------------------------------------------------------- Ran 9 tests in 3.551s FAILED (failures=1) {code} However, these problems don't persist when running the webserver, scheduler or worker. > Import error when interpreting DAG with customer operator > --------------------------------------------------------- > > Key: AIRFLOW-2214 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2214 > Project: Apache Airflow > Issue Type: Bug > Components: DAG, operators, plugins > Affects Versions: 1.9.0 > Reporter: Paymahn Moghadasian > Priority: Minor > > The [airflow docs|(https://airflow.apache.org/tutorial.html#testing] suggest > that a basic sanity check for a DAG file is to interpret it. ie: > {code:bash} > $ python ~/path/to/my/dag.py > {code} > I've found this to be useful. However, now I've created a plugin, > `MordorOperator` under `$AIRFLOW_HOME/plugins`: > {code:python} > from airflow.plugins_manager import AirflowPlugin > from airflow.utils.decorators import apply_defaults > from airflow.operators import BaseOperator > from airflow.exceptions import AirflowException > class MordorOperator(BaseOperator): > JOB_QUEUE_MAPPING = {"testing": "testing"} > @apply_defaults > def __init__(self, job, *args, **kwargs): > super().__init__(*args, **kwargs) > def execute(self, context): > print("Executing mordoroperator") > class MordorPlugin(AirflowPlugin): > name = "MordorPlugin" > operators = [MordorOperator] > {code} > I can import the plugin and see it work in a sample DAG: > {code:python} > from airflow import DAG > from airflow.operators import MordorOperator > from datetime import datetime > dag = DAG('mordor_dag', description='DAG with a single task', > start_date=datetime.today(), catchup=False) > mordor = MordorOperator(job="testing", task_id='run_single_task', dag=dag) > {code} > However, when I try to interpret this file I get failures which I suspect I > shouldn't get since the plugin successfully runs. My suspicion is that this > is because there's some dynamic code gen happening at runtime which isn't > available when a DAG is interpreted by itself. I also find that PyCharm can't > perform any autocompletion when importing the plugin. > {code:bash} > (venv) 3:54PM /Users/paymahn/solvvy/scheduler mordor.operator ✱ > ❮❮❮ python dags/mordor_test.py > section/key [core/airflow-home] not found in config > Traceback (most recent call last): > File "dags/mordor_test.py", line 2, in <module> > from airflow.operators import MordorOperator > ImportError: cannot import name 'MordorOperator' > {code} > How can a DAG using a plugin be sanity tested? Is there another mechanism to > import the custom operator which will allow interpreting the DAG from the > commandline? -- This message was sent by Atlassian JIRA (v7.6.3#76005)