[ 
https://issues.apache.org/jira/browse/AIRFLOW-6785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darwin Yip updated AIRFLOW-6785:
--------------------------------
    Description: 
The following worked in 1.10.1, but not in 1.10.9. It seems that the DagBag 
tries to execute the hook inside the SubdagOperator, which tries to connect to 
the database.

Assuming:
{code:python}
AIRFLOW_CONN_POSTGRES_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
{code}
 
{code:python}
import unittest

from airflow.models import DagBag


class TestDags(unittest.TestCase):
    """
    Generic tests that all DAGs in the repository should be able to pass.
    """
    LOAD_SECOND_THRESHOLD = 2

    def setUp(self):
        self.dagbag = DagBag()

    def test_dagbag_import(self):
        """
        Verify that Airflow will be able to import all DAGs in the repository.
        """
        self.assertFalse(
            len(self.dagbag.import_errors),
            'There should be no DAG failures. Got: {}'.format(
                self.dagbag.import_errors
            )
        )

{code}
 

 
{code:python}
from datetime import datetime, timedelta

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 2, 2),
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}


def get_data(**kwargs):
    df = PostgresHook(
        postgres_conn_id=kwargs['postgres_conn_id']
    ).get_pandas_df("select 1;")
    return df


def subdag(parent_dag_name, child_dag_name, args):

    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=args,
        schedule_interval=None,
    )

    run_query = PythonOperator(
        task_id=f'get_data_sub',
        python_callable=get_data,
        op_kwargs={
            'postgres_conn_id': 'postgres_conn'
        },
        provide_context=True,
        dag=dag_subdag
    )

    return dag_subdag


dag = DAG("test-hook-sub", default_args=default_args, schedule_interval=None)

start = DummyOperator(
    task_id='kick_off',
    dag=dag
)

section_1 = SubDagOperator(
    task_id='section-1',
    subdag=subdag("test-hook-sub", 'section-1', default_args),
    dag=dag,
)

start >> section_1
{code}
Error:
{code:python}
../lib/python3.6/site-packages/airflow/utils/db.py:74: in wrapper
    return func(*args, **kwargs)
../lib/python3.6/site-packages/airflow/utils/decorators.py:98: in wrapper
    result = func(*args, **kwargs)
../lib/python3.6/site-packages/airflow/operators/subdag_operator.py:77: in 
__init__
    .filter(Pool.pool == self.pool)
../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3287: in first
    ret = list(self[0:1])
../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3065: in __getitem__
    return list(res)
../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3389: in __iter__
    return self._execute_and_instances(context)
../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3411: in 
_execute_and_instances
    querycontext, self._connection_from_session, close_with_result=True
../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3426: in _get_bind_args
    mapper=self._bind_mapper(), clause=querycontext.statement, **kw
../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3404: in 
_connection_from_session
    conn = self.session.connection(**kw)
../lib/python3.6/site-packages/sqlalchemy/orm/session.py:1133: in connection
    execution_options=execution_options,
../lib/python3.6/site-packages/sqlalchemy/orm/session.py:1139: in 
_connection_for_bind
    engine, execution_options
../lib/python3.6/site-packages/sqlalchemy/orm/session.py:432: in 
_connection_for_bind
    conn = bind._contextual_connect()
../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2242: in 
_contextual_connect
    self._wrap_pool_connect(self.pool.connect, None),
../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2280: in 
_wrap_pool_connect
    e, dialect, self
../lib/python3.6/site-packages/sqlalchemy/engine/base.py:1547: in 
_handle_dbapi_exception_noconnection
    util.raise_from_cause(sqlalchemy_exception, exc_info)
../lib/python3.6/site-packages/sqlalchemy/util/compat.py:398: in 
raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
../lib/python3.6/site-packages/sqlalchemy/util/compat.py:152: in reraise
    raise value.with_traceback(tb)
../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2276: in 
_wrap_pool_connect
    return fn()
../lib/python3.6/site-packages/sqlalchemy/pool/base.py:363: in connect
    return _ConnectionFairy._checkout(self)
../lib/python3.6/site-packages/sqlalchemy/pool/base.py:773: in _checkout
    fairy = _ConnectionRecord.checkout(pool)
../lib/python3.6/site-packages/sqlalchemy/pool/base.py:492: in checkout
    rec = pool._do_get()
../lib/python3.6/site-packages/sqlalchemy/pool/impl.py:139: in _do_get
    self._dec_overflow()
../lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py:68: in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
../lib/python3.6/site-packages/sqlalchemy/util/compat.py:153: in reraise
    raise value
../lib/python3.6/site-packages/sqlalchemy/pool/impl.py:136: in _do_get
    return self._create_connection()
../lib/python3.6/site-packages/sqlalchemy/pool/base.py:308: in 
_create_connection
    return _ConnectionRecord(self)
../lib/python3.6/site-packages/sqlalchemy/pool/base.py:437: in __init__
    self.__connect(first_connect_check=True)
../lib/python3.6/site-packages/sqlalchemy/pool/base.py:652: in __connect
    connection = pool._invoke_creator(self)
../lib/python3.6/site-packages/sqlalchemy/engine/strategies.py:114: in connect
    return dialect.connect(*cargs, **cparams)
../lib/python3.6/site-packages/sqlalchemy/engine/default.py:489: in connect
    return self.dbapi.connect(*cargs, **cparams)
../lib/python3.6/site-packages/psycopg2/__init__.py:126: in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
E   sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not 
translate host name "postgres" to address: Name or service not known

{code}
 

However the non-subdag version passes the test:
{code:java}
from datetime import datetime, timedelta

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 2, 2),
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}


def get_data(**kwargs):
    """
    Returns DB data as a Pandas DataFrame
    """
    df = PostgresHook(
        postgres_conn_id=kwargs['postgres_conn_id']
    ).get_pandas_df("select 1;")
    return df


dag = DAG("test-hook", default_args=default_args, schedule_interval=None)

start = DummyOperator(
    task_id='kick_off',
    dag=dag
)


run_query = PythonOperator(
    task_id=f'get_data',
    python_callable=get_data,
    op_kwargs={
        'postgres_conn_id': 'postgres_conn'
    },
    provide_context=True,
    dag=dag
)

start >> run_query

{code}
 

  was:
The following worked in 1.10.1, but not in 1.10.9. It seems that the DagBag 
tries to execute the hook inside the SubdagOperator, which tries to connect to 
the database.

Assuming:
{code:python}
AIRFLOW_CONN_POSTGRES_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
{code}
 
{code:python}
import unittest

from airflow.models import DagBag


class TestDags(unittest.TestCase):
    """
    Generic tests that all DAGs in the repository should be able to pass.
    """
    LOAD_SECOND_THRESHOLD = 2

    def setUp(self):
        self.dagbag = DagBag()

    def test_dagbag_import(self):
        """
        Verify that Airflow will be able to import all DAGs in the repository.
        """
        self.assertFalse(
            len(self.dagbag.import_errors),
            'There should be no DAG failures. Got: {}'.format(
                self.dagbag.import_errors
            )
        )

{code}
 

 
{code:python}
from datetime import datetime, timedelta

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 2, 2),
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}


def get_data(**kwargs):
    df = PostgresHook(
        postgres_conn_id=kwargs['postgres_conn_id']
    ).get_pandas_df("select 1;")
    return df


def subdag(parent_dag_name, child_dag_name, args):

    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=args,
        schedule_interval=None,
    )

    run_query = PythonOperator(
        task_id=f'get_data_sub',
        python_callable=get_data,
        op_kwargs={
            'postgres_conn_id': 'postgres_conn'
        },
        provide_context=True,
        dag=dag_subdag
    )

    return dag_subdag


dag = DAG("test-hook-sub", default_args=default_args, schedule_interval=None)

start = DummyOperator(
    task_id='kick_off',
    dag=dag
)

section_1 = SubDagOperator(
    task_id='section-1',
    subdag=subdag("test-hook-sub", 'section-1', default_args),
    dag=dag,
)

start >> section_1
{code}
Error:
{code:python}
psycopg2.OperationalError: could not translate host name "postgres" to address: 
Name or service not known
{code}
 

However the non-subdag version passes the test:
{code:java}
from datetime import datetime, timedelta

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 2, 2),
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}


def get_data(**kwargs):
    """
    Returns DB data as a Pandas DataFrame
    """
    df = PostgresHook(
        postgres_conn_id=kwargs['postgres_conn_id']
    ).get_pandas_df("select 1;")
    return df


dag = DAG("test-hook", default_args=default_args, schedule_interval=None)

start = DummyOperator(
    task_id='kick_off',
    dag=dag
)


run_query = PythonOperator(
    task_id=f'get_data',
    python_callable=get_data,
    op_kwargs={
        'postgres_conn_id': 'postgres_conn'
    },
    provide_context=True,
    dag=dag
)

start >> run_query

{code}
 


> DagBag tries to run hook inside SubDagOperator
> ----------------------------------------------
>
>                 Key: AIRFLOW-6785
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6785
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG, hooks, operators
>    Affects Versions: 1.10.4, 1.10.5, 1.10.6, 1.10.7, 1.10.8, 1.10.9
>            Reporter: Darwin Yip
>            Priority: Major
>
> The following worked in 1.10.1, but not in 1.10.9. It seems that the DagBag 
> tries to execute the hook inside the SubdagOperator, which tries to connect 
> to the database.
> Assuming:
> {code:python}
> AIRFLOW_CONN_POSTGRES_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
> {code}
>  
> {code:python}
> import unittest
> from airflow.models import DagBag
> class TestDags(unittest.TestCase):
>     """
>     Generic tests that all DAGs in the repository should be able to pass.
>     """
>     LOAD_SECOND_THRESHOLD = 2
>     def setUp(self):
>         self.dagbag = DagBag()
>     def test_dagbag_import(self):
>         """
>         Verify that Airflow will be able to import all DAGs in the repository.
>         """
>         self.assertFalse(
>             len(self.dagbag.import_errors),
>             'There should be no DAG failures. Got: {}'.format(
>                 self.dagbag.import_errors
>             )
>         )
> {code}
>  
>  
> {code:python}
> from datetime import datetime, timedelta
> from airflow import DAG
> from airflow.hooks.postgres_hook import PostgresHook
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.subdag_operator import SubDagOperator
> default_args = {
>     "owner": "airflow",
>     "depends_on_past": False,
>     "start_date": datetime(2020, 2, 2),
>     "email": ["[email protected]"],
>     "email_on_failure": False,
>     "email_on_retry": False,
>     "retries": 1,
>     "retry_delay": timedelta(minutes=5),
> }
> def get_data(**kwargs):
>     df = PostgresHook(
>         postgres_conn_id=kwargs['postgres_conn_id']
>     ).get_pandas_df("select 1;")
>     return df
> def subdag(parent_dag_name, child_dag_name, args):
>     dag_subdag = DAG(
>         dag_id='%s.%s' % (parent_dag_name, child_dag_name),
>         default_args=args,
>         schedule_interval=None,
>     )
>     run_query = PythonOperator(
>         task_id=f'get_data_sub',
>         python_callable=get_data,
>         op_kwargs={
>             'postgres_conn_id': 'postgres_conn'
>         },
>         provide_context=True,
>         dag=dag_subdag
>     )
>     return dag_subdag
> dag = DAG("test-hook-sub", default_args=default_args, schedule_interval=None)
> start = DummyOperator(
>     task_id='kick_off',
>     dag=dag
> )
> section_1 = SubDagOperator(
>     task_id='section-1',
>     subdag=subdag("test-hook-sub", 'section-1', default_args),
>     dag=dag,
> )
> start >> section_1
> {code}
> Error:
> {code:python}
> ../lib/python3.6/site-packages/airflow/utils/db.py:74: in wrapper
>     return func(*args, **kwargs)
> ../lib/python3.6/site-packages/airflow/utils/decorators.py:98: in wrapper
>     result = func(*args, **kwargs)
> ../lib/python3.6/site-packages/airflow/operators/subdag_operator.py:77: in 
> __init__
>     .filter(Pool.pool == self.pool)
> ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3287: in first
>     ret = list(self[0:1])
> ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3065: in __getitem__
>     return list(res)
> ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3389: in __iter__
>     return self._execute_and_instances(context)
> ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3411: in 
> _execute_and_instances
>     querycontext, self._connection_from_session, close_with_result=True
> ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3426: in _get_bind_args
>     mapper=self._bind_mapper(), clause=querycontext.statement, **kw
> ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3404: in 
> _connection_from_session
>     conn = self.session.connection(**kw)
> ../lib/python3.6/site-packages/sqlalchemy/orm/session.py:1133: in connection
>     execution_options=execution_options,
> ../lib/python3.6/site-packages/sqlalchemy/orm/session.py:1139: in 
> _connection_for_bind
>     engine, execution_options
> ../lib/python3.6/site-packages/sqlalchemy/orm/session.py:432: in 
> _connection_for_bind
>     conn = bind._contextual_connect()
> ../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2242: in 
> _contextual_connect
>     self._wrap_pool_connect(self.pool.connect, None),
> ../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2280: in 
> _wrap_pool_connect
>     e, dialect, self
> ../lib/python3.6/site-packages/sqlalchemy/engine/base.py:1547: in 
> _handle_dbapi_exception_noconnection
>     util.raise_from_cause(sqlalchemy_exception, exc_info)
> ../lib/python3.6/site-packages/sqlalchemy/util/compat.py:398: in 
> raise_from_cause
>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
> ../lib/python3.6/site-packages/sqlalchemy/util/compat.py:152: in reraise
>     raise value.with_traceback(tb)
> ../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2276: in 
> _wrap_pool_connect
>     return fn()
> ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:363: in connect
>     return _ConnectionFairy._checkout(self)
> ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:773: in _checkout
>     fairy = _ConnectionRecord.checkout(pool)
> ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:492: in checkout
>     rec = pool._do_get()
> ../lib/python3.6/site-packages/sqlalchemy/pool/impl.py:139: in _do_get
>     self._dec_overflow()
> ../lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py:68: in __exit__
>     compat.reraise(exc_type, exc_value, exc_tb)
> ../lib/python3.6/site-packages/sqlalchemy/util/compat.py:153: in reraise
>     raise value
> ../lib/python3.6/site-packages/sqlalchemy/pool/impl.py:136: in _do_get
>     return self._create_connection()
> ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:308: in 
> _create_connection
>     return _ConnectionRecord(self)
> ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:437: in __init__
>     self.__connect(first_connect_check=True)
> ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:652: in __connect
>     connection = pool._invoke_creator(self)
> ../lib/python3.6/site-packages/sqlalchemy/engine/strategies.py:114: in connect
>     return dialect.connect(*cargs, **cparams)
> ../lib/python3.6/site-packages/sqlalchemy/engine/default.py:489: in connect
>     return self.dbapi.connect(*cargs, **cparams)
> ../lib/python3.6/site-packages/psycopg2/__init__.py:126: in connect
>     conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
> E   sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not 
> translate host name "postgres" to address: Name or service not known
> {code}
>  
> However the non-subdag version passes the test:
> {code:java}
> from datetime import datetime, timedelta
> from airflow import DAG
> from airflow.hooks.postgres_hook import PostgresHook
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> default_args = {
>     "owner": "airflow",
>     "depends_on_past": False,
>     "start_date": datetime(2020, 2, 2),
>     "email": ["[email protected]"],
>     "email_on_failure": False,
>     "email_on_retry": False,
>     "retries": 1,
>     "retry_delay": timedelta(minutes=5),
> }
> def get_data(**kwargs):
>     """
>     Returns DB data as a Pandas DataFrame
>     """
>     df = PostgresHook(
>         postgres_conn_id=kwargs['postgres_conn_id']
>     ).get_pandas_df("select 1;")
>     return df
> dag = DAG("test-hook", default_args=default_args, schedule_interval=None)
> start = DummyOperator(
>     task_id='kick_off',
>     dag=dag
> )
> run_query = PythonOperator(
>     task_id=f'get_data',
>     python_callable=get_data,
>     op_kwargs={
>         'postgres_conn_id': 'postgres_conn'
>     },
>     provide_context=True,
>     dag=dag
> )
> start >> run_query
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to