Apologies if the dev list is not an appropriate place to ask this question,
which I've also posted to
https://stackoverflow.com/questions/45227118/airflow-dependency-error-when-instantiating-multiple-tasks-via-a-for-loop
.
Thanks in advance!
Aaron
I'm running this DAG. It import functions from dash_workers.py (not
included, yet--would this be helpful?) and implements those functions as
tasks defined by PythonOperator:
from datetime import datetime, timedelta
import os
import sys
import airflow.models as af_models
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('CAPONE_DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime(2017, 7, 18),
'schedule_interval': None
}
DAG = af_models.DAG(
dag_id='dash_preproc',
default_args=default_args
)
get_id_creds = PythonOperator(
task_id='get_id_creds',
python_callable=dash_workers.get_id_creds,
provide_context=True,
dag=DAG)
with open('/tmp/ids.txt', 'r') as infile:
ids = infile.read().splitlines()
for uid in ids:
print('Building transactions for {}'.format(uid))
This results in:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 263, in process_file
m = imp.load_source(mod_name, filepath)
File
"/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/pyth
on3.6/imp.py", line 172, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 675, in _load
File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 205, in _call_with_frames_removed
File "/Users/aaronpolhamus/airflow/dags/dash_dag.py", line 47, in <module>
upload_transactions.set_upstream(get_id_creds)
File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 2478, in set_upstream
self._set_relatives(task_or_task_list, upstream=True)
File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 2458, in _set_relatives
task.append_only_new(task._downstream_task_ids, self.task_id)
File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 2419, in append_only_new
''.format(**locals()))
airflow.exceptions.AirflowException: Dependency <Task(PythonOperator):
get_rfc_creds>, upload_transactions already registered
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py",
line 573, in test
dag = dag or get_dag(args)
File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py",
line 126, in get_dag
'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found:
dash_preproc. Either the dag did not exist or it failed to parse.
I'm running this DAG. It import functions from dash_workers.py (not
included, yet--would this be helpful?) and implements those functions as
tasks defined by PythonOperator:
from datetime import datetime, timedelta
import os
import sys
import airflow.models as af_models
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('CAPONE_DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime(2017, 7, 18),
'schedule_interval': None
}
DAG = af_models.DAG(
dag_id='dash_preproc',
default_args=default_args
)
get_id_creds = PythonOperator(
task_id='get_id_creds',
python_callable=dash_workers.get_id_creds,
provide_context=True,
dag=DAG)
with open('/tmp/ids.txt', 'r') as infile:
ids = infile.read().splitlines()
for uid in ids:
print('Building transactions for {}'.format(uid))
upload_transactions = PythonOperator(
task_id='upload_transactions',
python_callable=dash_workers.upload_transactions,
op_args=[uid],
dag=DAG)
upload_transactions.set_upstream(get_id_creds)
This results in:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 263, in process_file
m = imp.load_source(mod_name, filepath)
File
"/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/pyth
on3.6/imp.py", line 172, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 675, in _load
File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 205, in _call_with_frames_removed
File "/Users/aaronpolhamus/airflow/dags/dash_dag.py", line 47, in <module>
upload_transactions.set_upstream(get_id_creds)
File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 2478, in set_upstream
self._set_relatives(task_or_task_list, upstream=True)
File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 2458, in _set_relatives
task.append_only_new(task._downstream_task_ids, self.task_id)
File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 2419, in append_only_new
''.format(**locals()))
airflow.exceptions.AirflowException: Dependency <Task(PythonOperator):
get_rfc_creds>, upload_transactions already registered
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py",
line 573, in test
dag = dag or get_dag(args)
File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py",
line 126, in get_dag
'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found:
dash_preproc. Either the dag did not exist or it failed to parse.
The application here is that I am extracting a list of IDs from a SQL table
using the function get_id_creds and then generating detailed data profiles
on a per-ID basis. Both functions use MySqlHook internally and I've tested
each function/task on a standalone basis to make sure that they result in
the expected behavior in isolation (they do).
The crux of the error seems to be the line airflow.exceptions.AirflowException:
Dependency <Task(PythonOperator): get_rfc_creds>, upload_transactions
already registered. This seems to suggest that on the first pass through
the loop the task is "registered" and then on the second pass the parser is
complaining that it's already done that operation. This example script
<https://github.com/trbs/airflow-examples/blob/master/dags/example_python_operator.py>
makes
it look easy to do what I'm doing here: just embed your downstream task
within a for loop. No idea why this is failing.
I'm set of for local parallelism with LocalExecutor. My understanding is
that if I can get this working I can run multiple data profile generation
jobs in parallel on the same machine.
I have two questions here:
1. Where is this error coming from and how can I get this script working?
2. Task 1 writes a list of IDs to hard disk and those are then read back
into the DAG for iterating over in the Task 2 execution loop. Is there are
more efficient way to pass the results of Task 1 to Task 2, and can you
provide a quick example?
--
*Aaron Polhamus*
*Director of Data Science *
Cel (México): +52 (55) 1951-5612
Cell (USA): +1 (206) 380-3948
Tel: +52 (55) 1168 9757 - Ext. 181
--
***Por favor referirse a nuestra página web
<https://www.credijusto.com/aviso-de-privacidad/> para más información
acerca de nuestras políticas de privacidad.*