Greetings, I manged to enable my DAG in the UI after refactoring my code as below. However the "toggle DAG on off switch" is missing for this DAG. Also when I trigger/execute this dag , it only finish the first task and doesnt continue the rest of the pipeline. (I assume this is because of the Missing Toggle button ?)
Any tips ? ========================== cdna_daily_dev.py ========================== from airflow import DAG import cdna_daily_common cdna_daily_common.airflow_host_port='adsadmin101z.prod.jp.local:9553' cdna_daily_common.env='dev' cdna_daily_common.alert_email='[email protected]' # [email protected] cdna_daily_common.schedule_interval='@once' cdna_daily_common.dag_name='cdna_daily_dev_v2' cdna_daily_common.dag_owner='bo.yu' cdna_daily_common.spdb_sync_prefix='echo SPDBSync' cdna_daily_common.post_validate_prefix='echo PostVal' cdna_daily_common.job_execution_prefix="python /home/bisuser/bo_dir/repos/customer_dna/py/job_execution.py -j" cdna_daily_common.genre_change_monitor_prefix="python /usr/local/bis/app/customer_dna/tool/genre_monitor/genre_change_monitor.py" dag = cdna_daily_common.initDAG() if __name__ == "__main__": dag.cli() ========================== cdna_daily_common.py ========================== global airflow_host_port global env global alert_email global spdb_sync_prefix global post_validate_prefix global dag_owner import subprocess import logging from airflow import DAG from datetime import datetime, timedelta from airflow.contrib.hooks import SSHHook from airflow.operators import EmailOperator from airflow.operators.bash_operator import BashOperator from airflow.contrib.operators import SSHExecuteOperator from airflow.operators.python_operator import PythonOperator def initDAG(): global dag_name global post_validate_prefix global spdb_sync_prefix global schedule_interval global job_execution_prefix global dag_owner global genre_change_monitor_prefix global alert_email # define SSHHook sshHook = SSHHook() . . # DAG setting default_args = { 'owner': dag_owner, 'depends_on_past': False, 'start_date': datetime(2017, 3, 16), 'email': [alert_email], 'email_on_failure': True, 'email_on_retry': False, 'retries': 5, 'retry_delay': timedelta(minutes=5), 'on_failure_callback': on_failure_callback, 'on_success_call': on_success } # dag_id, schedule_interval will replaced after review # schedule_interval="0 4 * * *" run every dag = DAG( dag_id=dag_name, default_args=default_args, schedule_interval=schedule_interval ) . . return dag ¥¥ On Tue, Jun 27, 2017 at 4:18 PM, Gerard Toonstra <[email protected]> wrote: > you should probably use: > > airflow test tutorial print_date 2015-06-01 > > syntax, as described here: > https://airflow.incubator.apache.org/tutorial.html > > When you execute "python <dagname>.py, it picks up code from the __main__ > section and > that's likely different. I'm not sure though. > > > On Tue, Jun 27, 2017 at 8:38 AM, Ashika Umanga Umagiliya < > [email protected]> wrote: > > > Thanks for the reply. > > > > When I execute "python cdna_daily_dev.py list_tasks" it gives correct > > output as expected. > > Does this mean that my DAG is valid ? > > > > > > bisuser@ins-server dags]$ python cdna_daily_dev.py list_tasks > > [2017-06-27 15:34:29,785] {__init__.py:36} INFO - Using executor > > CeleryExecutor > > [2017-06-27 15:34:30,459] {base_hook.py:53} INFO - Using connection to: > > ins-server.local > > > > d_daily_job_finished > > d_daily_job_start > > ... > > pv_proc_ichiba_device_preference_rat > > > > > > > > > > > > Also I added some airflow imports in "cdna_daily_dev.py" so that the file > > is detected as a DAG. Still the DAGs are not displated in the UI. > > > > > > > > > > ==================== > > cdna_daily_common.py > > ==================== > > global airflow_host_port > > global env > > global alert_email > > global spdb_sync_prefix > > global post_validate_prefix > > global schedule_interval > > global dag_name#PROD cdna_daily_prd > > global job_execution_prefix > > from airflow import DAG > > from datetime import datetime, timedelta > > from airflow.contrib.hooks import SSHHook > > from airflow.operators import EmailOperator > > from airflow.operators.bash_operator import BashOperator > > from airflow.contrib.operators import SSHExecuteOperator > > from airflow.operators.python_operator import PythonOperator > > > > global dag > > > > def initDAG(): > > # define SSHHook > > sshHook = SSHHook() > > global dag > > dag = DAG( > > dag_id=dag_name, > > default_args=default_args, > > schedule_interval="0 2 * * *" > > ) > > ..create rest of the DAG > > > > > > def execDAG(): > > globa dag > > initDAG() > > dag.cli() > > > > > > > > > > ================== > > cdna_daily_dev.py > > ================== > > from airflow import DAG > > import cdna_daily_common > > cdna_daily_common.airflow_host_port='hostnme.jp.local:9553' > > cdna_daily_common.env='dev' > > cdna_daily_common.alert_email='[email protected]' > > cdna_daily_common.spdb_sync_prefix='echo SPDBSync' > > cdna_daily_common.post_validate_prefix='echo PostVal' > > cdna_daily_common.schedule_interval='0 2 * * *' > > cdna_daily_common.dag_name='cdna_daily_dev_v2' > > cdna_daily_common.job_execution_prefix="python > > /home/bisuser/bo_dir/repos/customer_dna/py/job_execution.py -j" > > > > if __name__ == "__main__": > > cdna_daily_common.execDAG(); > > > > > > On Tue, Jun 27, 2017 at 3:18 PM, Gerard Toonstra <[email protected]> > > wrote: > > > > > For airflow to find dags, a .py file is read. The file should contain > > > either "DAG" or "airflow" somewhere to be considered a potential dag > > file. > > > Then there are some additional rules whether this actually gets > scheduled > > > or not. The log files for the dag file processors is not the same > > > as the main scheduler, so you could look for those explicitly and see > if > > > there are reasons they get rejected. > > > > > > > > > You can find this code in "dag_processing.list_by_file_path()", which > > > detects potential dags from files it finds. > > > This creates a list of files that should be considered, which > potentially > > > have a dag for scheduling. > > > > > > > > > Then there's models.DagBag.process_file, which is called later, which > > > actually attempts to parse dags and create Dag classes from those. > > > > > > > > > From your code, I think you've already noticed this page on the > subject: > > > https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls > > > > > > > > > Hope that helps! > > > > > > > > > Rgds, > > > > > > Gerard > > > > > > > > > > > > > > > On Tue, Jun 27, 2017 at 7:09 AM, Ashika Umanga Umagiliya < > > > [email protected]> wrote: > > > > > > > Greetings , > > > > > > > > We are trying to extract common DAG creation code into a single > source > > > file > > > > and create seperate file for differente environment imporing this > > > "common > > > > logic". > > > > > > > > "cdna_daily_common.py" has the common DAG creation code. File for > > > different > > > > environments are like "cdna_daily_dev.py" ,"cdna_daily_stg.py" ..etc > > > > > > > > However when I copy these files in the "dags" folder , they doesnt > show > > > up > > > > in Airflow UI. How can I make these DAGs visible on Airflow UI > > following > > > > this coding convention. > > > > > > > > > > > > ==================== > > > > cdna_daily_common.py > > > > ==================== > > > > global airflow_host_port > > > > global env > > > > global alert_email > > > > global spdb_sync_prefix > > > > global post_validate_prefix > > > > global schedule_interval > > > > global dag_name#PROD cdna_daily_prd > > > > global job_execution_prefix > > > > > > > > global dag > > > > > > > > def initDAG(): > > > > # define SSHHook > > > > sshHook = SSHHook() > > > > global dag > > > > dag = DAG( > > > > dag_id=dag_name, > > > > default_args=default_args, > > > > schedule_interval="0 2 * * *" > > > > ) > > > > ..create rest of the DAG > > > > > > > > > > > > def execDAG(): > > > > initDAG() > > > > dag.cli() > > > > > > > > > > > > > > > > > > > > ================== > > > > cdna_daily_dev.py > > > > ================== > > > > import cdna_daily_common > > > > cdna_daily_common.airflow_host_port='hostnme.jp.local:9553' > > > > cdna_daily_common.env='dev' > > > > cdna_daily_common.alert_email='[email protected]' > > > > cdna_daily_common.spdb_sync_prefix='echo SPDBSync' > > > > cdna_daily_common.post_validate_prefix='echo PostVal' > > > > cdna_daily_common.schedule_interval='0 2 * * *' > > > > cdna_daily_common.dag_name='cdna_daily_dev_v2' > > > > cdna_daily_common.job_execution_prefix="python > > > > /home/bisuser/bo_dir/repos/customer_dna/py/job_execution.py -j" > > > > > > > > if __name__ == "__main__": > > > > cdna_daily_common.execDAG(); > > > > > > > > > > > > > > > -- > > Umanga > > http://jp.linkedin.com/in/umanga > > http://umanga.ifreepages.com > > > -- Umanga http://jp.linkedin.com/in/umanga http://umanga.ifreepages.com
