My current DAGs works fine. Current problem is that , since we have almost the same logic for DEV,STG,PROD environment (only with minor parameter changes), having 3 different source files causes code redundancy.
I am trying to refactor the current "common DAG logic" into one file and separate only the parameters which change depend on the environment On Tue, Jun 27, 2017 at 4:18 PM, Gerard Toonstra <[email protected]> wrote: > you should probably use: > > airflow test tutorial print_date 2015-06-01 > > syntax, as described here: > https://airflow.incubator.apache.org/tutorial.html > > When you execute "python <dagname>.py, it picks up code from the __main__ > section and > that's likely different. I'm not sure though. > > > On Tue, Jun 27, 2017 at 8:38 AM, Ashika Umanga Umagiliya < > [email protected]> wrote: > > > Thanks for the reply. > > > > When I execute "python cdna_daily_dev.py list_tasks" it gives correct > > output as expected. > > Does this mean that my DAG is valid ? > > > > > > bisuser@ins-server dags]$ python cdna_daily_dev.py list_tasks > > [2017-06-27 15:34:29,785] {__init__.py:36} INFO - Using executor > > CeleryExecutor > > [2017-06-27 15:34:30,459] {base_hook.py:53} INFO - Using connection to: > > ins-server.local > > > > d_daily_job_finished > > d_daily_job_start > > ... > > pv_proc_ichiba_device_preference_rat > > > > > > > > > > > > Also I added some airflow imports in "cdna_daily_dev.py" so that the file > > is detected as a DAG. Still the DAGs are not displated in the UI. > > > > > > > > > > ==================== > > cdna_daily_common.py > > ==================== > > global airflow_host_port > > global env > > global alert_email > > global spdb_sync_prefix > > global post_validate_prefix > > global schedule_interval > > global dag_name#PROD cdna_daily_prd > > global job_execution_prefix > > from airflow import DAG > > from datetime import datetime, timedelta > > from airflow.contrib.hooks import SSHHook > > from airflow.operators import EmailOperator > > from airflow.operators.bash_operator import BashOperator > > from airflow.contrib.operators import SSHExecuteOperator > > from airflow.operators.python_operator import PythonOperator > > > > global dag > > > > def initDAG(): > > # define SSHHook > > sshHook = SSHHook() > > global dag > > dag = DAG( > > dag_id=dag_name, > > default_args=default_args, > > schedule_interval="0 2 * * *" > > ) > > ..create rest of the DAG > > > > > > def execDAG(): > > globa dag > > initDAG() > > dag.cli() > > > > > > > > > > ================== > > cdna_daily_dev.py > > ================== > > from airflow import DAG > > import cdna_daily_common > > cdna_daily_common.airflow_host_port='hostnme.jp.local:9553' > > cdna_daily_common.env='dev' > > cdna_daily_common.alert_email='[email protected]' > > cdna_daily_common.spdb_sync_prefix='echo SPDBSync' > > cdna_daily_common.post_validate_prefix='echo PostVal' > > cdna_daily_common.schedule_interval='0 2 * * *' > > cdna_daily_common.dag_name='cdna_daily_dev_v2' > > cdna_daily_common.job_execution_prefix="python > > /home/bisuser/bo_dir/repos/customer_dna/py/job_execution.py -j" > > > > if __name__ == "__main__": > > cdna_daily_common.execDAG(); > > > > > > On Tue, Jun 27, 2017 at 3:18 PM, Gerard Toonstra <[email protected]> > > wrote: > > > > > For airflow to find dags, a .py file is read. The file should contain > > > either "DAG" or "airflow" somewhere to be considered a potential dag > > file. > > > Then there are some additional rules whether this actually gets > scheduled > > > or not. The log files for the dag file processors is not the same > > > as the main scheduler, so you could look for those explicitly and see > if > > > there are reasons they get rejected. > > > > > > > > > You can find this code in "dag_processing.list_by_file_path()", which > > > detects potential dags from files it finds. > > > This creates a list of files that should be considered, which > potentially > > > have a dag for scheduling. > > > > > > > > > Then there's models.DagBag.process_file, which is called later, which > > > actually attempts to parse dags and create Dag classes from those. > > > > > > > > > From your code, I think you've already noticed this page on the > subject: > > > https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls > > > > > > > > > Hope that helps! > > > > > > > > > Rgds, > > > > > > Gerard > > > > > > > > > > > > > > > On Tue, Jun 27, 2017 at 7:09 AM, Ashika Umanga Umagiliya < > > > [email protected]> wrote: > > > > > > > Greetings , > > > > > > > > We are trying to extract common DAG creation code into a single > source > > > file > > > > and create seperate file for differente environment imporing this > > > "common > > > > logic". > > > > > > > > "cdna_daily_common.py" has the common DAG creation code. File for > > > different > > > > environments are like "cdna_daily_dev.py" ,"cdna_daily_stg.py" ..etc > > > > > > > > However when I copy these files in the "dags" folder , they doesnt > show > > > up > > > > in Airflow UI. How can I make these DAGs visible on Airflow UI > > following > > > > this coding convention. > > > > > > > > > > > > ==================== > > > > cdna_daily_common.py > > > > ==================== > > > > global airflow_host_port > > > > global env > > > > global alert_email > > > > global spdb_sync_prefix > > > > global post_validate_prefix > > > > global schedule_interval > > > > global dag_name#PROD cdna_daily_prd > > > > global job_execution_prefix > > > > > > > > global dag > > > > > > > > def initDAG(): > > > > # define SSHHook > > > > sshHook = SSHHook() > > > > global dag > > > > dag = DAG( > > > > dag_id=dag_name, > > > > default_args=default_args, > > > > schedule_interval="0 2 * * *" > > > > ) > > > > ..create rest of the DAG > > > > > > > > > > > > def execDAG(): > > > > initDAG() > > > > dag.cli() > > > > > > > > > > > > > > > > > > > > ================== > > > > cdna_daily_dev.py > > > > ================== > > > > import cdna_daily_common > > > > cdna_daily_common.airflow_host_port='hostnme.jp.local:9553' > > > > cdna_daily_common.env='dev' > > > > cdna_daily_common.alert_email='[email protected]' > > > > cdna_daily_common.spdb_sync_prefix='echo SPDBSync' > > > > cdna_daily_common.post_validate_prefix='echo PostVal' > > > > cdna_daily_common.schedule_interval='0 2 * * *' > > > > cdna_daily_common.dag_name='cdna_daily_dev_v2' > > > > cdna_daily_common.job_execution_prefix="python > > > > /home/bisuser/bo_dir/repos/customer_dna/py/job_execution.py -j" > > > > > > > > if __name__ == "__main__": > > > > cdna_daily_common.execDAG(); > > > > > > > > > > > > > > > -- > > Umanga > > http://jp.linkedin.com/in/umanga > > http://umanga.ifreepages.com > > > -- Umanga http://jp.linkedin.com/in/umanga http://umanga.ifreepages.com
