Krishna Garapati created AIRFLOW-3873:
-----------------------------------------
Summary: Issue with DAG dependency using ExternalTaskSensor
Key: AIRFLOW-3873
URL: https://issues.apache.org/jira/browse/AIRFLOW-3873
Project: Apache Airflow
Issue Type: Bug
Components: DAG
Affects Versions: 1.10.1
Environment: Running on Redhat Linux box on which Airflow is Installed.
Reporter: Krishna Garapati
Fix For: 1.10.2
I have two DAGs Created and want to set dependencies between them using
externalTaskSensor as shown below. I am getting the error as "Broken DAG:
[/data1/airflow/dags/testdagdependency.py] No module named snakebite.client".
Please help me on this.
==================================================================
*DAG 1:*
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'Krishna Garapati',
'depends_on_past': False,
'start_date': datetime(2019, 2, 9),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 0,
'retry_delay': timedelta(minutes=5)
#'queue': 'finance-ingestion',
# 'run_as_user': 'sptfinactmodel'
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('pythontest',default_args=default_args,schedule_interval='27 2 * * *')
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='pythontest',
bash_command='\{{"python
/preprod/finance/financedatastagedev/scripts/airflowtest/hive/test.py"}}',
dag=dag)
t2.set_upstream(t1)
==========================================================
*DAG 2 ( Keeping dependency on DAG1)*
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'Krishna Garapati',
'depends_on_past': False,
'start_date': datetime(2019, 2, 11),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 0,
'retry_delay': timedelta(minutes=5)
#'queue': 'finance-ingestion',
# 'run_as_user': 'sptfinactmodel'
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('testdagdependency',default_args=default_args,schedule_interval='27
15 * * *')
wait_for_pythontest = ExternalTaskSensor(
task_id='wait_for_pythontest',
external_dag_id='pythontest',
external_task_id='pythontest',
execution_delta=None, # Same day as today
dag=dag)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='testdependency',
bash_command='\{{"python
/preprod/finance/financedatastagedev/scripts/airflowtest/hive/test.py"}}',
dag=dag)
wait_for_pythontest >> testdagdependency
t2.set_upstream(t1)
=====================================================
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)