Darwin Yip created AIRFLOW-6785:
-----------------------------------
Summary: DagBag tries to run hook inside SubDagOperator
Key: AIRFLOW-6785
URL: https://issues.apache.org/jira/browse/AIRFLOW-6785
Project: Apache Airflow
Issue Type: Bug
Components: DAG, hooks, operators
Affects Versions: 1.10.9
Reporter: Darwin Yip
The following worked in 1.10.1, but not in 1.10.9. It seems that the DagBag
tries to execute the hook inside the SubdagOperator, which tries to connect to
the database.
Assuming:
{code:python}
AIRFLOW_CONN_POSTGRES_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
{code}
{code:python}
import unittest
from airflow.models import DagBag
class TestDags(unittest.TestCase):
"""
Generic tests that all DAGs in the repository should be able to pass.
"""
LOAD_SECOND_THRESHOLD = 2
def setUp(self):
self.dagbag = DagBag()
def test_dagbag_import(self):
"""
Verify that Airflow will be able to import all DAGs in the repository.
"""
self.assertFalse(
len(self.dagbag.import_errors),
'There should be no DAG failures. Got: {}'.format(
self.dagbag.import_errors
)
)
{code}
{code:python}
from datetime import datetime, timedelta
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2020, 2, 2),
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
def get_data(**kwargs):
df = PostgresHook(
postgres_conn_id=kwargs['postgres_conn_id']
).get_pandas_df("select 1;")
return df
def subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval=None,
)
run_query = PythonOperator(
task_id=f'get_data_sub',
python_callable=get_data,
op_kwargs={
'postgres_conn_id': 'postgres_conn'
},
provide_context=True,
dag=dag_subdag
)
return dag_subdag
dag = DAG("test-hook-sub", default_args=default_args, schedule_interval=None)
start = DummyOperator(
task_id='kick_off',
dag=dag
)
section_1 = SubDagOperator(
task_id='section-1',
subdag=subdag("test-hook-sub", 'section-1', default_args),
dag=dag,
)
start >> section_1
{code}
Error:
{code:python}
psycopg2.OperationalError: could not translate host name "postgres" to address:
Name or service not known
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)