[
https://issues.apache.org/jira/browse/AIRFLOW-6785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Darwin Yip updated AIRFLOW-6785:
--------------------------------
Description:
The following worked in 1.10.1, but not in 1.10.9. It seems that the DagBag
tries to execute the hook inside the SubdagOperator, which tries to connect to
the database.
Assuming:
{code:python}
AIRFLOW_CONN_POSTGRES_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
{code}
{code:python}
import unittest
from airflow.models import DagBag
class TestDags(unittest.TestCase):
"""
Generic tests that all DAGs in the repository should be able to pass.
"""
LOAD_SECOND_THRESHOLD = 2
def setUp(self):
self.dagbag = DagBag()
def test_dagbag_import(self):
"""
Verify that Airflow will be able to import all DAGs in the repository.
"""
self.assertFalse(
len(self.dagbag.import_errors),
'There should be no DAG failures. Got: {}'.format(
self.dagbag.import_errors
)
)
{code}
{code:python}
from datetime import datetime, timedelta
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2020, 2, 2),
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
def get_data(**kwargs):
df = PostgresHook(
postgres_conn_id=kwargs['postgres_conn_id']
).get_pandas_df("select 1;")
return df
def subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval=None,
)
run_query = PythonOperator(
task_id=f'get_data_sub',
python_callable=get_data,
op_kwargs={
'postgres_conn_id': 'postgres_conn'
},
provide_context=True,
dag=dag_subdag
)
return dag_subdag
dag = DAG("test-hook-sub", default_args=default_args, schedule_interval=None)
start = DummyOperator(
task_id='kick_off',
dag=dag
)
section_1 = SubDagOperator(
task_id='section-1',
subdag=subdag("test-hook-sub", 'section-1', default_args),
dag=dag,
)
start >> section_1
{code}
Error:
{code:python}
../lib/python3.6/site-packages/airflow/utils/db.py:74: in wrapper
return func(*args, **kwargs)
../lib/python3.6/site-packages/airflow/utils/decorators.py:98: in wrapper
result = func(*args, **kwargs)
../lib/python3.6/site-packages/airflow/operators/subdag_operator.py:77: in
__init__
.filter(Pool.pool == self.pool)
../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3287: in first
ret = list(self[0:1])
../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3065: in __getitem__
return list(res)
../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3389: in __iter__
return self._execute_and_instances(context)
../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3411: in
_execute_and_instances
querycontext, self._connection_from_session, close_with_result=True
../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3426: in _get_bind_args
mapper=self._bind_mapper(), clause=querycontext.statement, **kw
../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3404: in
_connection_from_session
conn = self.session.connection(**kw)
../lib/python3.6/site-packages/sqlalchemy/orm/session.py:1133: in connection
execution_options=execution_options,
../lib/python3.6/site-packages/sqlalchemy/orm/session.py:1139: in
_connection_for_bind
engine, execution_options
../lib/python3.6/site-packages/sqlalchemy/orm/session.py:432: in
_connection_for_bind
conn = bind._contextual_connect()
../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2242: in
_contextual_connect
self._wrap_pool_connect(self.pool.connect, None),
../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2280: in
_wrap_pool_connect
e, dialect, self
../lib/python3.6/site-packages/sqlalchemy/engine/base.py:1547: in
_handle_dbapi_exception_noconnection
util.raise_from_cause(sqlalchemy_exception, exc_info)
../lib/python3.6/site-packages/sqlalchemy/util/compat.py:398: in
raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
../lib/python3.6/site-packages/sqlalchemy/util/compat.py:152: in reraise
raise value.with_traceback(tb)
../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2276: in
_wrap_pool_connect
return fn()
../lib/python3.6/site-packages/sqlalchemy/pool/base.py:363: in connect
return _ConnectionFairy._checkout(self)
../lib/python3.6/site-packages/sqlalchemy/pool/base.py:773: in _checkout
fairy = _ConnectionRecord.checkout(pool)
../lib/python3.6/site-packages/sqlalchemy/pool/base.py:492: in checkout
rec = pool._do_get()
../lib/python3.6/site-packages/sqlalchemy/pool/impl.py:139: in _do_get
self._dec_overflow()
../lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py:68: in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
../lib/python3.6/site-packages/sqlalchemy/util/compat.py:153: in reraise
raise value
../lib/python3.6/site-packages/sqlalchemy/pool/impl.py:136: in _do_get
return self._create_connection()
../lib/python3.6/site-packages/sqlalchemy/pool/base.py:308: in
_create_connection
return _ConnectionRecord(self)
../lib/python3.6/site-packages/sqlalchemy/pool/base.py:437: in __init__
self.__connect(first_connect_check=True)
../lib/python3.6/site-packages/sqlalchemy/pool/base.py:652: in __connect
connection = pool._invoke_creator(self)
../lib/python3.6/site-packages/sqlalchemy/engine/strategies.py:114: in connect
return dialect.connect(*cargs, **cparams)
../lib/python3.6/site-packages/sqlalchemy/engine/default.py:489: in connect
return self.dbapi.connect(*cargs, **cparams)
../lib/python3.6/site-packages/psycopg2/__init__.py:126: in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
E sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not
translate host name "postgres" to address: Name or service not known
{code}
However the non-subdag version passes the test:
{code:java}
from datetime import datetime, timedelta
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2020, 2, 2),
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
def get_data(**kwargs):
"""
Returns DB data as a Pandas DataFrame
"""
df = PostgresHook(
postgres_conn_id=kwargs['postgres_conn_id']
).get_pandas_df("select 1;")
return df
dag = DAG("test-hook", default_args=default_args, schedule_interval=None)
start = DummyOperator(
task_id='kick_off',
dag=dag
)
run_query = PythonOperator(
task_id=f'get_data',
python_callable=get_data,
op_kwargs={
'postgres_conn_id': 'postgres_conn'
},
provide_context=True,
dag=dag
)
start >> run_query
{code}
was:
The following worked in 1.10.1, but not in 1.10.9. It seems that the DagBag
tries to execute the hook inside the SubdagOperator, which tries to connect to
the database.
Assuming:
{code:python}
AIRFLOW_CONN_POSTGRES_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
{code}
{code:python}
import unittest
from airflow.models import DagBag
class TestDags(unittest.TestCase):
"""
Generic tests that all DAGs in the repository should be able to pass.
"""
LOAD_SECOND_THRESHOLD = 2
def setUp(self):
self.dagbag = DagBag()
def test_dagbag_import(self):
"""
Verify that Airflow will be able to import all DAGs in the repository.
"""
self.assertFalse(
len(self.dagbag.import_errors),
'There should be no DAG failures. Got: {}'.format(
self.dagbag.import_errors
)
)
{code}
{code:python}
from datetime import datetime, timedelta
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2020, 2, 2),
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
def get_data(**kwargs):
df = PostgresHook(
postgres_conn_id=kwargs['postgres_conn_id']
).get_pandas_df("select 1;")
return df
def subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval=None,
)
run_query = PythonOperator(
task_id=f'get_data_sub',
python_callable=get_data,
op_kwargs={
'postgres_conn_id': 'postgres_conn'
},
provide_context=True,
dag=dag_subdag
)
return dag_subdag
dag = DAG("test-hook-sub", default_args=default_args, schedule_interval=None)
start = DummyOperator(
task_id='kick_off',
dag=dag
)
section_1 = SubDagOperator(
task_id='section-1',
subdag=subdag("test-hook-sub", 'section-1', default_args),
dag=dag,
)
start >> section_1
{code}
Error:
{code:python}
psycopg2.OperationalError: could not translate host name "postgres" to address:
Name or service not known
{code}
However the non-subdag version passes the test:
{code:java}
from datetime import datetime, timedelta
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2020, 2, 2),
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
def get_data(**kwargs):
"""
Returns DB data as a Pandas DataFrame
"""
df = PostgresHook(
postgres_conn_id=kwargs['postgres_conn_id']
).get_pandas_df("select 1;")
return df
dag = DAG("test-hook", default_args=default_args, schedule_interval=None)
start = DummyOperator(
task_id='kick_off',
dag=dag
)
run_query = PythonOperator(
task_id=f'get_data',
python_callable=get_data,
op_kwargs={
'postgres_conn_id': 'postgres_conn'
},
provide_context=True,
dag=dag
)
start >> run_query
{code}
> DagBag tries to run hook inside SubDagOperator
> ----------------------------------------------
>
> Key: AIRFLOW-6785
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6785
> Project: Apache Airflow
> Issue Type: Bug
> Components: DAG, hooks, operators
> Affects Versions: 1.10.4, 1.10.5, 1.10.6, 1.10.7, 1.10.8, 1.10.9
> Reporter: Darwin Yip
> Priority: Major
>
> The following worked in 1.10.1, but not in 1.10.9. It seems that the DagBag
> tries to execute the hook inside the SubdagOperator, which tries to connect
> to the database.
> Assuming:
> {code:python}
> AIRFLOW_CONN_POSTGRES_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
> {code}
>
> {code:python}
> import unittest
> from airflow.models import DagBag
> class TestDags(unittest.TestCase):
> """
> Generic tests that all DAGs in the repository should be able to pass.
> """
> LOAD_SECOND_THRESHOLD = 2
> def setUp(self):
> self.dagbag = DagBag()
> def test_dagbag_import(self):
> """
> Verify that Airflow will be able to import all DAGs in the repository.
> """
> self.assertFalse(
> len(self.dagbag.import_errors),
> 'There should be no DAG failures. Got: {}'.format(
> self.dagbag.import_errors
> )
> )
> {code}
>
>
> {code:python}
> from datetime import datetime, timedelta
> from airflow import DAG
> from airflow.hooks.postgres_hook import PostgresHook
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.subdag_operator import SubDagOperator
> default_args = {
> "owner": "airflow",
> "depends_on_past": False,
> "start_date": datetime(2020, 2, 2),
> "email": ["[email protected]"],
> "email_on_failure": False,
> "email_on_retry": False,
> "retries": 1,
> "retry_delay": timedelta(minutes=5),
> }
> def get_data(**kwargs):
> df = PostgresHook(
> postgres_conn_id=kwargs['postgres_conn_id']
> ).get_pandas_df("select 1;")
> return df
> def subdag(parent_dag_name, child_dag_name, args):
> dag_subdag = DAG(
> dag_id='%s.%s' % (parent_dag_name, child_dag_name),
> default_args=args,
> schedule_interval=None,
> )
> run_query = PythonOperator(
> task_id=f'get_data_sub',
> python_callable=get_data,
> op_kwargs={
> 'postgres_conn_id': 'postgres_conn'
> },
> provide_context=True,
> dag=dag_subdag
> )
> return dag_subdag
> dag = DAG("test-hook-sub", default_args=default_args, schedule_interval=None)
> start = DummyOperator(
> task_id='kick_off',
> dag=dag
> )
> section_1 = SubDagOperator(
> task_id='section-1',
> subdag=subdag("test-hook-sub", 'section-1', default_args),
> dag=dag,
> )
> start >> section_1
> {code}
> Error:
> {code:python}
> ../lib/python3.6/site-packages/airflow/utils/db.py:74: in wrapper
> return func(*args, **kwargs)
> ../lib/python3.6/site-packages/airflow/utils/decorators.py:98: in wrapper
> result = func(*args, **kwargs)
> ../lib/python3.6/site-packages/airflow/operators/subdag_operator.py:77: in
> __init__
> .filter(Pool.pool == self.pool)
> ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3287: in first
> ret = list(self[0:1])
> ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3065: in __getitem__
> return list(res)
> ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3389: in __iter__
> return self._execute_and_instances(context)
> ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3411: in
> _execute_and_instances
> querycontext, self._connection_from_session, close_with_result=True
> ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3426: in _get_bind_args
> mapper=self._bind_mapper(), clause=querycontext.statement, **kw
> ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3404: in
> _connection_from_session
> conn = self.session.connection(**kw)
> ../lib/python3.6/site-packages/sqlalchemy/orm/session.py:1133: in connection
> execution_options=execution_options,
> ../lib/python3.6/site-packages/sqlalchemy/orm/session.py:1139: in
> _connection_for_bind
> engine, execution_options
> ../lib/python3.6/site-packages/sqlalchemy/orm/session.py:432: in
> _connection_for_bind
> conn = bind._contextual_connect()
> ../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2242: in
> _contextual_connect
> self._wrap_pool_connect(self.pool.connect, None),
> ../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2280: in
> _wrap_pool_connect
> e, dialect, self
> ../lib/python3.6/site-packages/sqlalchemy/engine/base.py:1547: in
> _handle_dbapi_exception_noconnection
> util.raise_from_cause(sqlalchemy_exception, exc_info)
> ../lib/python3.6/site-packages/sqlalchemy/util/compat.py:398: in
> raise_from_cause
> reraise(type(exception), exception, tb=exc_tb, cause=cause)
> ../lib/python3.6/site-packages/sqlalchemy/util/compat.py:152: in reraise
> raise value.with_traceback(tb)
> ../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2276: in
> _wrap_pool_connect
> return fn()
> ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:363: in connect
> return _ConnectionFairy._checkout(self)
> ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:773: in _checkout
> fairy = _ConnectionRecord.checkout(pool)
> ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:492: in checkout
> rec = pool._do_get()
> ../lib/python3.6/site-packages/sqlalchemy/pool/impl.py:139: in _do_get
> self._dec_overflow()
> ../lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py:68: in __exit__
> compat.reraise(exc_type, exc_value, exc_tb)
> ../lib/python3.6/site-packages/sqlalchemy/util/compat.py:153: in reraise
> raise value
> ../lib/python3.6/site-packages/sqlalchemy/pool/impl.py:136: in _do_get
> return self._create_connection()
> ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:308: in
> _create_connection
> return _ConnectionRecord(self)
> ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:437: in __init__
> self.__connect(first_connect_check=True)
> ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:652: in __connect
> connection = pool._invoke_creator(self)
> ../lib/python3.6/site-packages/sqlalchemy/engine/strategies.py:114: in connect
> return dialect.connect(*cargs, **cparams)
> ../lib/python3.6/site-packages/sqlalchemy/engine/default.py:489: in connect
> return self.dbapi.connect(*cargs, **cparams)
> ../lib/python3.6/site-packages/psycopg2/__init__.py:126: in connect
> conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
> E sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not
> translate host name "postgres" to address: Name or service not known
> {code}
>
> However the non-subdag version passes the test:
> {code:java}
> from datetime import datetime, timedelta
> from airflow import DAG
> from airflow.hooks.postgres_hook import PostgresHook
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> default_args = {
> "owner": "airflow",
> "depends_on_past": False,
> "start_date": datetime(2020, 2, 2),
> "email": ["[email protected]"],
> "email_on_failure": False,
> "email_on_retry": False,
> "retries": 1,
> "retry_delay": timedelta(minutes=5),
> }
> def get_data(**kwargs):
> """
> Returns DB data as a Pandas DataFrame
> """
> df = PostgresHook(
> postgres_conn_id=kwargs['postgres_conn_id']
> ).get_pandas_df("select 1;")
> return df
> dag = DAG("test-hook", default_args=default_args, schedule_interval=None)
> start = DummyOperator(
> task_id='kick_off',
> dag=dag
> )
> run_query = PythonOperator(
> task_id=f'get_data',
> python_callable=get_data,
> op_kwargs={
> 'postgres_conn_id': 'postgres_conn'
> },
> provide_context=True,
> dag=dag
> )
> start >> run_query
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)