Darwin Yip created AIRFLOW-6785:
-----------------------------------

             Summary: DagBag tries to run hook inside SubDagOperator
                 Key: AIRFLOW-6785
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6785
             Project: Apache Airflow
          Issue Type: Bug
          Components: DAG, hooks, operators
    Affects Versions: 1.10.9
            Reporter: Darwin Yip


The following worked in 1.10.1, but not in 1.10.9. It seems that the DagBag 
tries to execute the hook inside the SubdagOperator, which tries to connect to 
the database.

Assuming:
{code:python}
AIRFLOW_CONN_POSTGRES_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
{code}
 
{code:python}
import unittest

from airflow.models import DagBag


class TestDags(unittest.TestCase):
    """
    Generic tests that all DAGs in the repository should be able to pass.
    """
    LOAD_SECOND_THRESHOLD = 2

    def setUp(self):
        self.dagbag = DagBag()

    def test_dagbag_import(self):
        """
        Verify that Airflow will be able to import all DAGs in the repository.
        """
        self.assertFalse(
            len(self.dagbag.import_errors),
            'There should be no DAG failures. Got: {}'.format(
                self.dagbag.import_errors
            )
        )

{code}
 

 
{code:python}
from datetime import datetime, timedelta

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 2, 2),
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}


def get_data(**kwargs):
    df = PostgresHook(
        postgres_conn_id=kwargs['postgres_conn_id']
    ).get_pandas_df("select 1;")
    return df


def subdag(parent_dag_name, child_dag_name, args):

    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=args,
        schedule_interval=None,
    )

    run_query = PythonOperator(
        task_id=f'get_data_sub',
        python_callable=get_data,
        op_kwargs={
            'postgres_conn_id': 'postgres_conn'
        },
        provide_context=True,
        dag=dag_subdag
    )

    return dag_subdag


dag = DAG("test-hook-sub", default_args=default_args, schedule_interval=None)

start = DummyOperator(
    task_id='kick_off',
    dag=dag
)

section_1 = SubDagOperator(
    task_id='section-1',
    subdag=subdag("test-hook-sub", 'section-1', default_args),
    dag=dag,
)

start >> section_1
{code}
Error:
{code:python}
psycopg2.OperationalError: could not translate host name "postgres" to address: 
Name or service not known
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to