Uwe Jentsch created AIRFLOW-6486:
------------------------------------
Summary: SubdagOperator fails when manually set to success
Key: AIRFLOW-6486
URL: https://issues.apache.org/jira/browse/AIRFLOW-6486
Project: Apache Airflow
Issue Type: Bug
Components: DAG
Affects Versions: 1.10.7
Reporter: Uwe Jentsch
Attachments: dag_subdag_2020-01-05T04_00_00+00_00_1.log,
dag_subdag_2020-01-05T04_00_00+00_00_2.log, mvp_dag.py
Hello,
sometimes we want to set specific operators of our DAG to success but after the
upgrade from version 1.10.1 to 1.10.7 we got the problem that subdag operators
are failed after we set them to success. All internal operators are
successfully set to the success state.
Here are the two exeptions we get. I'll attach a full log of both to this
ticket.
{code:java}
[2020-01-06 11:46:09,706] {taskinstance.py:1088} ERROR - [Errno 2] No such file
or directory
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py",
line 788, in _execute
session=session)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 70,
in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py",
line 718, in _execute_for_run_dates
session=session)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 70,
in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py",
line 592, in _process_backfill_task_instances
self.heartbeat()
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line
188, in heartbeat
sleep(sleep_for)
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py",
line 932, in signal_handler
raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib64/python3.6/multiprocessing/managers.py", line 749, in
_callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py",
line 955, in _run_raw_task
result = task_copy.execute(context=context)
File
"/usr/local/lib/python3.6/site-packages/airflow/operators/subdag_operator.py",
line 102, in execute
executor=self.executor)
File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line
1290, in run
job.run()
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line
221, in run
self._execute()
File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74,
in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py",
line 812, in _execute
executor.end()
File
"/usr/local/lib/python3.6/site-packages/airflow/executors/local_executor.py",
line 234, in end
self.impl.end()
File
"/usr/local/lib/python3.6/site-packages/airflow/executors/local_executor.py",
line 210, in end
self.queue.put((None, None))
File "<string>", line 2, in put
File "/usr/lib64/python3.6/multiprocessing/managers.py", line 753, in
_callmethod
self._connect()
File "/usr/lib64/python3.6/multiprocessing/managers.py", line 740, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/lib64/python3.6/multiprocessing/connection.py", line 487, in Client
c = SocketClient(address)
File "/usr/lib64/python3.6/multiprocessing/connection.py", line 614, in
SocketClient
s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
[2020-01-06 11:46:09,775] {taskinstance.py:1119} INFO - Marking task as FAILED.
{code}
{code:java}
[2020-01-06 12:07:57,269] {taskinstance.py:1088} ERROR -
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py",
line 788, in _execute
session=session)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 70,
in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py",
line 718, in _execute_for_run_dates
session=session)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 70,
in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py",
line 592, in _process_backfill_task_instances
self.heartbeat()
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line
188, in heartbeat
sleep(sleep_for)
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py",
line 932, in signal_handler
raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py",
line 955, in _run_raw_task
result = task_copy.execute(context=context)
File
"/usr/local/lib/python3.6/site-packages/airflow/operators/subdag_operator.py",
line 102, in execute
executor=self.executor)
File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line
1290, in run
job.run()
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line
221, in run
self._execute()
File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74,
in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py",
line 812, in _execute
executor.end()
File
"/usr/local/lib/python3.6/site-packages/airflow/executors/local_executor.py",
line 234, in end
self.impl.end()
File
"/usr/local/lib/python3.6/site-packages/airflow/executors/local_executor.py",
line 213, in end
self.queue.join()
File "<string>", line 2, in join
File "/usr/lib64/python3.6/multiprocessing/managers.py", line 757, in
_callmethod
kind, result = conn.recv()
File "/usr/lib64/python3.6/multiprocessing/connection.py", line 250, in recv
buf = self._recv_bytes()
File "/usr/lib64/python3.6/multiprocessing/connection.py", line 407, in
_recv_bytes
buf = self._recv(4)
File "/usr/lib64/python3.6/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError
[2020-01-06 12:07:57,272] {taskinstance.py:1119} INFO - Marking task as FAILED.
{code}
If we zoom into the Subdag and set everything to success, the subdag operator
itself will also be in the success state. But if we want to set an operator and
its downstream to success this will often set multiple subdag operators to
success that will then fail.
We are using LocalExecutor for the DAG as well as for the Subdag operator. I'll
add a minimal example I could use to reproduce the issue.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)