Greetings,

I manged to enable my DAG in the UI after refactoring my code as below.
However the "toggle DAG on off switch" is missing for this DAG.
Also when I trigger/execute this dag , it only finish the first task and
doesnt continue the rest of the pipeline. (I assume this is because of the
Missing Toggle button ?)

Any tips ?

==========================
cdna_daily_dev.py
==========================
from airflow import DAG
import cdna_daily_common
cdna_daily_common.airflow_host_port='adsadmin101z.prod.jp.local:9553'
cdna_daily_common.env='dev'
cdna_daily_common.alert_email='[email protected]' #
[email protected]
cdna_daily_common.schedule_interval='@once'
cdna_daily_common.dag_name='cdna_daily_dev_v2'
cdna_daily_common.dag_owner='bo.yu'
cdna_daily_common.spdb_sync_prefix='echo SPDBSync'
cdna_daily_common.post_validate_prefix='echo PostVal'
cdna_daily_common.job_execution_prefix="python
/home/bisuser/bo_dir/repos/customer_dna/py/job_execution.py -j"
cdna_daily_common.genre_change_monitor_prefix="python
/usr/local/bis/app/customer_dna/tool/genre_monitor/genre_change_monitor.py"

dag = cdna_daily_common.initDAG()

if __name__ == "__main__":
    dag.cli()




==========================
cdna_daily_common.py
==========================
global airflow_host_port
global env
global alert_email
global spdb_sync_prefix
global post_validate_prefix
global dag_owner
import subprocess
import logging
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks import SSHHook
from airflow.operators import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators import SSHExecuteOperator
from airflow.operators.python_operator import PythonOperator



def initDAG():
    global dag_name
    global post_validate_prefix
    global spdb_sync_prefix
    global schedule_interval
    global job_execution_prefix
    global dag_owner
    global genre_change_monitor_prefix
    global alert_email

    # define SSHHook
    sshHook = SSHHook()
    .
    .

    # DAG setting
    default_args = {
        'owner': dag_owner,
        'depends_on_past': False,
        'start_date': datetime(2017, 3, 16),
        'email': [alert_email],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 5,
        'retry_delay': timedelta(minutes=5),
        'on_failure_callback': on_failure_callback,
        'on_success_call': on_success
    }

    # dag_id, schedule_interval will replaced after review
    # schedule_interval="0 4 * * *" run every
    dag = DAG(
        dag_id=dag_name,
        default_args=default_args,
        schedule_interval=schedule_interval
    )
    .
    .
    return dag
¥¥

On Tue, Jun 27, 2017 at 4:18 PM, Gerard Toonstra <[email protected]>
wrote:

> you should probably use:
>
> airflow test tutorial print_date 2015-06-01
>
> syntax, as described here:
> https://airflow.incubator.apache.org/tutorial.html
>
> When you execute "python <dagname>.py, it picks up code from the __main__
> section and
> that's likely different.  I'm not sure though.
>
>
> On Tue, Jun 27, 2017 at 8:38 AM, Ashika Umanga Umagiliya <
> [email protected]> wrote:
>
> > Thanks for the reply.
> >
> > When I execute "python cdna_daily_dev.py list_tasks" it gives correct
> > output as expected.
> > Does this mean that my DAG is valid ?
> >
> >
> > bisuser@ins-server dags]$ python cdna_daily_dev.py list_tasks
> > [2017-06-27 15:34:29,785] {__init__.py:36} INFO - Using executor
> > CeleryExecutor
> > [2017-06-27 15:34:30,459] {base_hook.py:53} INFO - Using connection to:
> > ins-server.local
> >
> > d_daily_job_finished
> > d_daily_job_start
> > ...
> > pv_proc_ichiba_device_preference_rat
> >
> >
> >
> >
> >
> > Also I added some airflow imports in "cdna_daily_dev.py" so that the file
> > is detected as a DAG. Still the DAGs are not displated in the UI.
> >
> >
> >
> >
> > ====================
> > cdna_daily_common.py
> > ====================
> > global airflow_host_port
> > global env
> > global alert_email
> > global spdb_sync_prefix
> > global post_validate_prefix
> > global schedule_interval
> > global dag_name#PROD cdna_daily_prd
> > global job_execution_prefix
> > from airflow import DAG
> > from datetime import datetime, timedelta
> > from airflow.contrib.hooks import SSHHook
> > from airflow.operators import EmailOperator
> > from airflow.operators.bash_operator import BashOperator
> > from airflow.contrib.operators import SSHExecuteOperator
> > from airflow.operators.python_operator import PythonOperator
> >
> > global dag
> >
> > def initDAG():
> >     # define SSHHook
> >     sshHook = SSHHook()
> >     global dag
> >     dag = DAG(
> >         dag_id=dag_name,
> >         default_args=default_args,
> >         schedule_interval="0 2 * * *"
> >     )
> >     ..create rest of the DAG
> >
> >
> > def execDAG():
> > globa dag
> > initDAG()
> > dag.cli()
> >
> >
> >
> >
> > ==================
> > cdna_daily_dev.py
> > ==================
> > from airflow import DAG
> > import cdna_daily_common
> > cdna_daily_common.airflow_host_port='hostnme.jp.local:9553'
> > cdna_daily_common.env='dev'
> > cdna_daily_common.alert_email='[email protected]'
> > cdna_daily_common.spdb_sync_prefix='echo SPDBSync'
> > cdna_daily_common.post_validate_prefix='echo PostVal'
> > cdna_daily_common.schedule_interval='0 2 * * *'
> > cdna_daily_common.dag_name='cdna_daily_dev_v2'
> > cdna_daily_common.job_execution_prefix="python
> > /home/bisuser/bo_dir/repos/customer_dna/py/job_execution.py -j"
> >
> > if __name__ == "__main__":
> >     cdna_daily_common.execDAG();
> >
> >
> > On Tue, Jun 27, 2017 at 3:18 PM, Gerard Toonstra <[email protected]>
> > wrote:
> >
> > > For airflow to find dags, a .py file is read. The file should contain
> > > either "DAG" or "airflow" somewhere to be considered a potential dag
> > file.
> > > Then there are some additional rules whether this actually gets
> scheduled
> > > or not. The log files for the dag file processors is not the same
> > > as the main scheduler, so you could look for those explicitly and see
> if
> > > there are reasons they get rejected.
> > >
> > >
> > > You can find this code in "dag_processing.list_by_file_path()", which
> > > detects potential dags from files it finds.
> > > This creates a list of files that should be considered, which
> potentially
> > > have a dag for scheduling.
> > >
> > >
> > > Then there's models.DagBag.process_file, which is called later, which
> > > actually attempts to parse dags and create Dag classes from those.
> > >
> > >
> > > From your code, I think you've already noticed this page on the
> subject:
> > > https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls
> > >
> > >
> > > Hope that helps!
> > >
> > >
> > > Rgds,
> > >
> > > Gerard
> > >
> > >
> > >
> > >
> > > On Tue, Jun 27, 2017 at 7:09 AM, Ashika Umanga Umagiliya <
> > > [email protected]> wrote:
> > >
> > > > Greetings ,
> > > >
> > > > We are trying to extract common DAG creation code into a single
> source
> > > file
> > > > and create seperate file for differente environment  imporing this
> > > "common
> > > > logic".
> > > >
> > > > "cdna_daily_common.py" has the common DAG creation code. File for
> > > different
> > > > environments are like "cdna_daily_dev.py" ,"cdna_daily_stg.py" ..etc
> > > >
> > > > However when I copy these files in the "dags" folder , they doesnt
> show
> > > up
> > > > in Airflow UI. How can I make these DAGs visible on Airflow UI
> > following
> > > > this coding convention.
> > > >
> > > >
> > > > ====================
> > > > cdna_daily_common.py
> > > > ====================
> > > > global airflow_host_port
> > > > global env
> > > > global alert_email
> > > > global spdb_sync_prefix
> > > > global post_validate_prefix
> > > > global schedule_interval
> > > > global dag_name#PROD cdna_daily_prd
> > > > global job_execution_prefix
> > > >
> > > > global dag
> > > >
> > > > def initDAG():
> > > >     # define SSHHook
> > > >     sshHook = SSHHook()
> > > >     global dag
> > > >     dag = DAG(
> > > >         dag_id=dag_name,
> > > >         default_args=default_args,
> > > >         schedule_interval="0 2 * * *"
> > > >     )
> > > >     ..create rest of the DAG
> > > >
> > > >
> > > > def execDAG():
> > > > initDAG()
> > > > dag.cli()
> > > >
> > > >
> > > >
> > > >
> > > > ==================
> > > > cdna_daily_dev.py
> > > > ==================
> > > > import cdna_daily_common
> > > > cdna_daily_common.airflow_host_port='hostnme.jp.local:9553'
> > > > cdna_daily_common.env='dev'
> > > > cdna_daily_common.alert_email='[email protected]'
> > > > cdna_daily_common.spdb_sync_prefix='echo SPDBSync'
> > > > cdna_daily_common.post_validate_prefix='echo PostVal'
> > > > cdna_daily_common.schedule_interval='0 2 * * *'
> > > > cdna_daily_common.dag_name='cdna_daily_dev_v2'
> > > > cdna_daily_common.job_execution_prefix="python
> > > > /home/bisuser/bo_dir/repos/customer_dna/py/job_execution.py -j"
> > > >
> > > > if __name__ == "__main__":
> > > >     cdna_daily_common.execDAG();
> > > >
> > >
> >
> >
> >
> > --
> > Umanga
> > http://jp.linkedin.com/in/umanga
> > http://umanga.ifreepages.com
> >
>



-- 
Umanga
http://jp.linkedin.com/in/umanga
http://umanga.ifreepages.com

Reply via email to