shanit-saha opened a new issue #9197:
URL: https://github.com/apache/airflow/issues/9197
On Migrating Airflow from V1.10.2 to V1.10.10 One of our DAG have a task
which is of dagrun_operator type.
Code snippet of the task looks something as below. Please assume that DAG
`dag_process_pos` exists
```
task_trigger_dag_positional = TriggerDagRunOperator(
trigger_dag_id="dag_process_pos",
python_callable=set_up_dag_run_preprocessing,
task_id="trigger_preprocess_dag",
on_failure_callback=log_failure,
execution_date=datetime.now(),
provide_context=False,
owner='airflow')
def set_up_dag_run_preprocessing(context, dag_run_obj):
ti = context['ti']
dag_name = context['ti'].task.trigger_dag_id
dag_run = context['dag_run']
trans_id = dag_run.conf['transaction_id']
routing_info = ti.xcom_pull(task_ids="json_validation",
key="route_info")
new_file_path = routing_info['file_location']
new_file_name = os.path.basename(routing_info['new_file_name'])
file_path = os.path.join(new_file_path, new_file_name)
batch_id = "123-AD-FF"
dag_run_obj.payload = {'inputfilepath': file_path,
'transaction_id': trans_id,
'Id': batch_id}
```
The DAG runs all fine. In fact the python callable of the task mentioned
until the last line. Then it errors out.
```
[2020-06-09 11:36:22,838] {taskinstance.py:1145} ERROR - No row was found
for one()
Traceback (most recent call last):
File
"/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line
983, in _run_raw_task
result = task_copy.execute(context=context)
File
"/usr/local/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py",
line 95, in execute
replace_microseconds=False)
File
"/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py",
line 141, in trigger_dag
replace_microseconds=replace_microseconds,
File
"/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py",
line 98, in _trigger_dag
external_trigger=True,
File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line
74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line
1471, in create_dagrun
run.refresh_from_db()
File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line
74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/models/dagrun.py",
line 109, in refresh_from_db
DR.run_id == self.run_id
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py",
line 3446, in one
raise orm_exc.NoResultFound("No row was found for one()")
sqlalchemy.orm.exc.NoResultFound: No row was found for one()
```
After which the `on_failure_callback` of that task is executed and all code
of that callable runs perfectly ok as is expected. The query here is why did
the dagrun_operator fail after the python callable.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]