This isn't needed as the tasks are added to the dag when specified, so they DAG object keeps track of the tasks.
-ash > On 14 Sep 2018, at 18:30, Alex Tronchin-James 949-412-7220 > <[email protected]> wrote: > > Don't you need to preserve the task objects? Your implementation overwrites > each by the successor, so only the last task would be kept, despite your > print statements. Try building a list or dict of tasks like: > > tasks =[] #only at the top > for file in glob('dags/snowsql/create/udf/*.sql'): > print("FILE {}".format(file)) > tasks.append( > create_snowflake_operator(file, dag, 'snowflake_default') > ) > tasks[-1].set_upstream(start) > > On Fri, Sep 14, 2018 at 17:20 Frank Maritato > <[email protected]> wrote: > >> Ok, my mistake. I thought that command was querying the server for its >> information and not just looking in a directory relative to where it is >> being run. I have it working now. Thanks Chris and Sai! >> >> >> On 9/14/18, 9:58 AM, "Chris Palmer" <[email protected]> wrote: >> >> The relative paths might work from where ever you are evoking 'airflow >> list_tasks', but that doesn't mean they work from wherever the >> webserver is >> parsing the dags from. >> >> Does running 'airflow list_tasks' from some other running directory >> work? >> >> On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato >> <[email protected]> wrote: >> >>> Do you mean give the full path to the files? The relative path I'm >> using >>> definitely works. When I type airflow list_dags, I can see the >> output from >>> the print statements that the glob is finding my sql files and >> creating the >>> snowflake operators. >>> >>> airflow list_tasks workflow also lists all the operators I'm >> creating. I'm >>> just not seeing them in the ui. >>> >>> On 9/14/18, 9:10 AM, "Sai Phanindhra" <[email protected]> wrote: >>> >>> Hi frank, >>> Can you try giving global paths? >>> >>> On Fri 14 Sep, 2018, 21:35 Frank Maritato, < >> [email protected] >>> .invalid> >>> wrote: >>> >>>> Hi, >>>> >>>> I'm using apache airflow 1.10.0 and I'm trying to dynamically >>> generate >>>> some tasks in my dag based on files that are in the dags >> directory. >>> The >>>> problem is, I don't see these tasks in the ui, I just see the >>> 'start' dummy >>>> operator. If I type 'airflow list_tasks workflow', they are >> listed. >>>> Thoughts? >>>> >>>> Here is how I'm generating the tasks: >>>> >>>> >>>> def create_snowflake_operator(file, dag, snowflake_connection): >>>> file_repl = file.replace('/', '_') >>>> file_repl = file_repl.replace('.sql', '') >>>> print("TASK_ID {}".format(file_repl)) >>>> return SnowflakeOperator( >>>> dag=dag, >>>> task_id='create_{}'.format(file_repl), >>>> snowflake_conn_id=snowflake_connection, >>>> sql=file >>>> ) >>>> >>>> DAG_NAME = 'create_objects' >>>> dag = DAG( >>>> DAG_NAME, >>>> default_args=args, >>>> dagrun_timeout=timedelta(hours=2), >>>> schedule_interval=None, >>>> ) >>>> >>>> start = DummyOperator( >>>> dag=dag, >>>> task_id="start", >>>> ) >>>> >>>> print("creating snowflake operators") >>>> >>>> for file in glob('dags/snowsql/create/udf/*.sql'): >>>> print("FILE {}".format(file)) >>>> task = create_snowflake_operator(file, dag, >> 'snowflake_default') >>>> task.set_upstream(start) >>>> >>>> for file in glob('dags/snowsql/create/table/*.sql'): >>>> print("FILE {}".format(file)) >>>> task = create_snowflake_operator(file, dag, >> 'snowflake_default') >>>> task.set_upstream(start) >>>> >>>> for file in glob('dags/snowsql/create/view/*.sql'): >>>> print("FILE {}".format(file)) >>>> task = create_snowflake_operator(file, dag, >> 'snowflake_default') >>>> task.set_upstream(start) >>>> >>>> print("done {}".format(start.downstream_task_ids)) >>>> >>>> Thanks in advance >>>> -- >>>> Frank Maritato >>>> >>> >>> >>> >> >> >>
