[
https://issues.apache.org/jira/browse/AIRFLOW-6785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Darwin Yip updated AIRFLOW-6785:
--------------------------------
Affects Version/s: 1.10.4
1.10.5
1.10.6
1.10.7
1.10.8
> DagBag tries to run hook inside SubDagOperator
> ----------------------------------------------
>
> Key: AIRFLOW-6785
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6785
> Project: Apache Airflow
> Issue Type: Bug
> Components: DAG, hooks, operators
> Affects Versions: 1.10.4, 1.10.5, 1.10.6, 1.10.7, 1.10.8, 1.10.9
> Reporter: Darwin Yip
> Priority: Major
>
> The following worked in 1.10.1, but not in 1.10.9. It seems that the DagBag
> tries to execute the hook inside the SubdagOperator, which tries to connect
> to the database.
> Assuming:
> {code:python}
> AIRFLOW_CONN_POSTGRES_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
> {code}
>
> {code:python}
> import unittest
> from airflow.models import DagBag
> class TestDags(unittest.TestCase):
> """
> Generic tests that all DAGs in the repository should be able to pass.
> """
> LOAD_SECOND_THRESHOLD = 2
> def setUp(self):
> self.dagbag = DagBag()
> def test_dagbag_import(self):
> """
> Verify that Airflow will be able to import all DAGs in the repository.
> """
> self.assertFalse(
> len(self.dagbag.import_errors),
> 'There should be no DAG failures. Got: {}'.format(
> self.dagbag.import_errors
> )
> )
> {code}
>
>
> {code:python}
> from datetime import datetime, timedelta
> from airflow import DAG
> from airflow.hooks.postgres_hook import PostgresHook
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.subdag_operator import SubDagOperator
> default_args = {
> "owner": "airflow",
> "depends_on_past": False,
> "start_date": datetime(2020, 2, 2),
> "email": ["[email protected]"],
> "email_on_failure": False,
> "email_on_retry": False,
> "retries": 1,
> "retry_delay": timedelta(minutes=5),
> }
> def get_data(**kwargs):
> df = PostgresHook(
> postgres_conn_id=kwargs['postgres_conn_id']
> ).get_pandas_df("select 1;")
> return df
> def subdag(parent_dag_name, child_dag_name, args):
> dag_subdag = DAG(
> dag_id='%s.%s' % (parent_dag_name, child_dag_name),
> default_args=args,
> schedule_interval=None,
> )
> run_query = PythonOperator(
> task_id=f'get_data_sub',
> python_callable=get_data,
> op_kwargs={
> 'postgres_conn_id': 'postgres_conn'
> },
> provide_context=True,
> dag=dag_subdag
> )
> return dag_subdag
> dag = DAG("test-hook-sub", default_args=default_args, schedule_interval=None)
> start = DummyOperator(
> task_id='kick_off',
> dag=dag
> )
> section_1 = SubDagOperator(
> task_id='section-1',
> subdag=subdag("test-hook-sub", 'section-1', default_args),
> dag=dag,
> )
> start >> section_1
> {code}
> Error:
> {code:python}
> psycopg2.OperationalError: could not translate host name "postgres" to
> address: Name or service not known
> {code}
>
> However the non-subdag version passes the test:
> {code:java}
> from datetime import datetime, timedelta
> from airflow import DAG
> from airflow.hooks.postgres_hook import PostgresHook
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> default_args = {
> "owner": "airflow",
> "depends_on_past": False,
> "start_date": datetime(2020, 2, 2),
> "email": ["[email protected]"],
> "email_on_failure": False,
> "email_on_retry": False,
> "retries": 1,
> "retry_delay": timedelta(minutes=5),
> }
> def get_data(**kwargs):
> """
> Returns DB data as a Pandas DataFrame
> """
> df = PostgresHook(
> postgres_conn_id=kwargs['postgres_conn_id']
> ).get_pandas_df("select 1;")
> return df
> dag = DAG("test-hook", default_args=default_args, schedule_interval=None)
> start = DummyOperator(
> task_id='kick_off',
> dag=dag
> )
> run_query = PythonOperator(
> task_id=f'get_data',
> python_callable=get_data,
> op_kwargs={
> 'postgres_conn_id': 'postgres_conn'
> },
> provide_context=True,
> dag=dag
> )
> start >> run_query
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)