Hi,
I'm using apache airflow 1.10.0 and I'm trying to dynamically generate some
tasks in my dag based on files that are in the dags directory. The problem is,
I don't see these tasks in the ui, I just see the 'start' dummy operator. If I
type 'airflow list_tasks workflow', they are listed. Thoughts?
Here is how I'm generating the tasks:
def create_snowflake_operator(file, dag, snowflake_connection):
file_repl = file.replace('/', '_')
file_repl = file_repl.replace('.sql', '')
print("TASK_ID {}".format(file_repl))
return SnowflakeOperator(
dag=dag,
task_id='create_{}'.format(file_repl),
snowflake_conn_id=snowflake_connection,
sql=file
)
DAG_NAME = 'create_objects'
dag = DAG(
DAG_NAME,
default_args=args,
dagrun_timeout=timedelta(hours=2),
schedule_interval=None,
)
start = DummyOperator(
dag=dag,
task_id="start",
)
print("creating snowflake operators")
for file in glob('dags/snowsql/create/udf/*.sql'):
print("FILE {}".format(file))
task = create_snowflake_operator(file, dag, 'snowflake_default')
task.set_upstream(start)
for file in glob('dags/snowsql/create/table/*.sql'):
print("FILE {}".format(file))
task = create_snowflake_operator(file, dag, 'snowflake_default')
task.set_upstream(start)
for file in glob('dags/snowsql/create/view/*.sql'):
print("FILE {}".format(file))
task = create_snowflake_operator(file, dag, 'snowflake_default')
task.set_upstream(start)
print("done {}".format(start.downstream_task_ids))
Thanks in advance
--
Frank Maritato